This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 02e3de35f0d [Improment]add internal workload group (#42006)
02e3de35f0d is described below

commit 02e3de35f0d1a2ae6014c3056548639eb951920f
Author: wangbo <wan...@selectdb.com>
AuthorDate: Thu Nov 14 13:12:02 2024 +0800

    [Improment]add internal workload group (#42006)
    
    ## Proposed changes
    Add an internal workload group when Doris started, currently it mainly
    used to manage compaction workload cpu usage.
---
 be/src/agent/cgroup_cpu_ctl.cpp                    |   6 +-
 be/src/agent/cgroup_cpu_ctl.h                      |   2 +-
 be/src/agent/topic_subscriber.cpp                  |   6 +-
 be/src/agent/workload_group_listener.cpp           |   2 +-
 be/src/cloud/cloud_storage_engine.cpp              |  31 ++-
 be/src/cloud/cloud_storage_engine.h                |   2 +-
 be/src/olap/olap_server.cpp                        |  77 ++++---
 be/src/olap/storage_engine.h                       |   5 +-
 be/src/pipeline/task_scheduler.h                   |   4 +-
 be/src/runtime/exec_env_init.cpp                   |   4 +-
 be/src/runtime/workload_group/workload_group.cpp   |  85 +++++---
 be/src/runtime/workload_group/workload_group.h     |  25 ++-
 .../workload_group/workload_group_manager.cpp      |  23 +++
 .../workload_group/workload_group_manager.h        |  12 ++
 be/src/util/threadpool.cpp                         |   7 +-
 be/src/util/threadpool.h                           |   6 +-
 be/src/vec/exec/scan/scanner_scheduler.h           |   9 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |  11 +-
 .../doris/analysis/AlterWorkloadGroupStmt.java     |  20 +-
 .../doris/analysis/CreateWorkloadGroupStmt.java    |  18 +-
 .../doris/analysis/DropWorkloadGroupStmt.java      |   3 -
 .../main/java/org/apache/doris/catalog/Env.java    |   2 +
 .../doris/catalog/InternalSchemaInitializer.java   |   5 +-
 .../java/org/apache/doris/common/FeConstants.java  |   3 +
 .../CreateInternalWorkloadGroupThread.java         |  55 +++++
 .../resource/workloadgroup/WorkloadGroup.java      |  59 +++++-
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 109 +++++++---
 .../workloadgroup/WorkloadGroupMgrTest.java        | 222 +++++++++++++++++++++
 .../apache/doris/utframe/TestWithFeService.java    |   1 +
 gensrc/thrift/BackendService.thrift                |   4 +
 .../workload_manager_p0/test_curd_wlg.groovy       |  29 +++
 31 files changed, 698 insertions(+), 149 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index e68535a708c..76b72f2c9d0 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -158,11 +158,11 @@ uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
     return _is_enable_cgroup_v2_in_env ? 100 : 1024;
 }
 
-std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t 
wg_id) {
+std::shared_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t 
wg_id) {
     if (_is_enable_cgroup_v2_in_env) {
-        return std::make_unique<CgroupV2CpuCtl>(wg_id);
+        return std::make_shared<CgroupV2CpuCtl>(wg_id);
     } else if (_is_enable_cgroup_v1_in_env) {
-        return std::make_unique<CgroupV1CpuCtl>(wg_id);
+        return std::make_shared<CgroupV1CpuCtl>(wg_id);
     }
     return nullptr;
 }
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 84e191159f1..b23f1f4dd9c 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -52,7 +52,7 @@ public:
 
     static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);
 
-    static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
+    static std::shared_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
 
     static bool is_a_valid_cgroup_path(std::string cg_path);
 
diff --git a/be/src/agent/topic_subscriber.cpp 
b/be/src/agent/topic_subscriber.cpp
index f62bdaef099..b470e1534e1 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -40,14 +40,12 @@ void TopicSubscriber::handle_topic_info(const 
TPublishTopicRequest& topic_reques
     // eg, update workload info may delay other listener, then we need add a 
thread here
     // to handle_topic_info asynchronous
     std::shared_lock lock(_listener_mtx);
-    LOG(INFO) << "[topic_publish]begin handle topic info";
     for (auto& listener_pair : _registered_listeners) {
         if (topic_request.topic_map.find(listener_pair.first) != 
topic_request.topic_map.end()) {
-            LOG(INFO) << "[topic_publish]begin handle topic " << 
listener_pair.first
-                      << ", size=" << 
topic_request.topic_map.at(listener_pair.first).size();
             listener_pair.second->handle_topic_info(
                     topic_request.topic_map.at(listener_pair.first));
-            LOG(INFO) << "[topic_publish]finish handle topic " << 
listener_pair.first;
+            LOG(INFO) << "[topic_publish]finish handle topic " << 
listener_pair.first
+                      << ", size=" << 
topic_request.topic_map.at(listener_pair.first).size();
         }
     }
 }
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index f0f57869f25..7b688b7dcdf 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -59,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                 workload_group_info.enable_cpu_hard_limit);
 
         // 4 create and update task scheduler
-        wg->upsert_task_scheduler(&workload_group_info, _exec_env);
+        wg->upsert_task_scheduler(&workload_group_info);
 
         // 5 upsert io throttle
         wg->upsert_scan_io_throttle(&workload_group_info);
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 5d7b445917a..dc6abbac31b 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -231,7 +231,7 @@ Result<BaseTabletSPtr> 
CloudStorageEngine::get_tablet(int64_t tablet_id) {
     });
 }
 
-Status CloudStorageEngine::start_bg_threads() {
+Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> 
wg_sptr) {
     RETURN_IF_ERROR(Thread::create(
             "CloudStorageEngine", "refresh_s3_info_thread",
             [this]() { this->_refresh_storage_vault_info_thread_callback(); },
@@ -266,14 +266,27 @@ Status CloudStorageEngine::start_bg_threads() {
     // compaction tasks producer thread
     int base_thread_num = get_base_thread_num();
     int cumu_thread_num = get_cumu_thread_num();
-    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
-                            .set_min_threads(base_thread_num)
-                            .set_max_threads(base_thread_num)
-                            .build(&_base_compaction_thread_pool));
-    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
-                            .set_min_threads(cumu_thread_num)
-                            .set_max_threads(cumu_thread_num)
-                            .build(&_cumu_compaction_thread_pool));
+    if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
+        RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                                .set_min_threads(base_thread_num)
+                                .set_max_threads(base_thread_num)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_base_compaction_thread_pool));
+        RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                                .set_min_threads(cumu_thread_num)
+                                .set_max_threads(cumu_thread_num)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_cumu_compaction_thread_pool));
+    } else {
+        RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                                .set_min_threads(base_thread_num)
+                                .set_max_threads(base_thread_num)
+                                .build(&_base_compaction_thread_pool));
+        RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                                .set_min_threads(cumu_thread_num)
+                                .set_max_threads(cumu_thread_num)
+                                .build(&_cumu_compaction_thread_pool));
+    }
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "compaction_tasks_producer_thread",
             [this]() { this->_compaction_tasks_producer_callback(); },
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 92d2917a916..072b8366542 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -57,7 +57,7 @@ public:
 
     Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) override;
 
-    Status start_bg_threads() override;
+    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) 
override;
 
     Status set_cluster_id(int32_t cluster_id) override {
         _effective_cluster_id = cluster_id;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index a0c5a05636b..736bdaa9930 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -210,7 +210,7 @@ static int32_t 
get_single_replica_compaction_threads_num(size_t data_dirs_num) {
     return threads_num;
 }
 
-Status StorageEngine::start_bg_threads() {
+Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) 
{
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "unused_rowset_monitor_thread",
             [this]() { this->_unused_rowset_monitor_thread_callback(); },
@@ -243,29 +243,60 @@ Status StorageEngine::start_bg_threads() {
     auto single_replica_compaction_threads =
             get_single_replica_compaction_threads_num(data_dirs.size());
 
-    RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
-                            .set_min_threads(base_compaction_threads)
-                            .set_max_threads(base_compaction_threads)
-                            .build(&_base_compaction_thread_pool));
-    RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
-                            .set_min_threads(cumu_compaction_threads)
-                            .set_max_threads(cumu_compaction_threads)
-                            .build(&_cumu_compaction_thread_pool));
-    RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
-                            .set_min_threads(single_replica_compaction_threads)
-                            .set_max_threads(single_replica_compaction_threads)
-                            .build(&_single_replica_compaction_thread_pool));
-
-    if (config::enable_segcompaction) {
-        RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
-                                
.set_min_threads(config::segcompaction_num_threads)
-                                
.set_max_threads(config::segcompaction_num_threads)
-                                .build(&_seg_compaction_thread_pool));
+    if (wg_sptr->get_cgroup_cpu_ctl_wptr().lock()) {
+        RETURN_IF_ERROR(ThreadPoolBuilder("gBaseCompactionTaskThreadPool")
+                                .set_min_threads(base_compaction_threads)
+                                .set_max_threads(base_compaction_threads)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_base_compaction_thread_pool));
+        RETURN_IF_ERROR(ThreadPoolBuilder("gCumuCompactionTaskThreadPool")
+                                .set_min_threads(cumu_compaction_threads)
+                                .set_max_threads(cumu_compaction_threads)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_cumu_compaction_thread_pool));
+        
RETURN_IF_ERROR(ThreadPoolBuilder("gSingleReplicaCompactionTaskThreadPool")
+                                
.set_min_threads(single_replica_compaction_threads)
+                                
.set_max_threads(single_replica_compaction_threads)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                
.build(&_single_replica_compaction_thread_pool));
+
+        if (config::enable_segcompaction) {
+            RETURN_IF_ERROR(ThreadPoolBuilder("gSegCompactionTaskThreadPool")
+                                    
.set_min_threads(config::segcompaction_num_threads)
+                                    
.set_max_threads(config::segcompaction_num_threads)
+                                    
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                    .build(&_seg_compaction_thread_pool));
+        }
+        RETURN_IF_ERROR(ThreadPoolBuilder("gColdDataCompactionTaskThreadPool")
+                                
.set_min_threads(config::cold_data_compaction_thread_num)
+                                
.set_max_threads(config::cold_data_compaction_thread_num)
+                                
.set_cgroup_cpu_ctl(wg_sptr->get_cgroup_cpu_ctl_wptr())
+                                .build(&_cold_data_compaction_thread_pool));
+    } else {
+        RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+                                .set_min_threads(base_compaction_threads)
+                                .set_max_threads(base_compaction_threads)
+                                .build(&_base_compaction_thread_pool));
+        RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+                                .set_min_threads(cumu_compaction_threads)
+                                .set_max_threads(cumu_compaction_threads)
+                                .build(&_cumu_compaction_thread_pool));
+        
RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool")
+                                
.set_min_threads(single_replica_compaction_threads)
+                                
.set_max_threads(single_replica_compaction_threads)
+                                
.build(&_single_replica_compaction_thread_pool));
+
+        if (config::enable_segcompaction) {
+            RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool")
+                                    
.set_min_threads(config::segcompaction_num_threads)
+                                    
.set_max_threads(config::segcompaction_num_threads)
+                                    .build(&_seg_compaction_thread_pool));
+        }
+        RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
+                                
.set_min_threads(config::cold_data_compaction_thread_num)
+                                
.set_max_threads(config::cold_data_compaction_thread_num)
+                                .build(&_cold_data_compaction_thread_pool));
     }
-    RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
-                            
.set_min_threads(config::cold_data_compaction_thread_num)
-                            
.set_max_threads(config::cold_data_compaction_thread_num)
-                            .build(&_cold_data_compaction_thread_pool));
 
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 421c0eb352d..a2201589898 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -72,6 +72,7 @@ class ReportWorker;
 class CreateTabletRRIdxCache;
 struct DirInfo;
 class SnapshotManager;
+class WorkloadGroup;
 
 using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
 using SegCompactionCandidatesSharedPtr = 
std::shared_ptr<SegCompactionCandidates>;
@@ -105,7 +106,7 @@ public:
     virtual bool stopped() = 0;
 
     // start all background threads. This should be call after env is ready.
-    virtual Status start_bg_threads() = 0;
+    virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = 
nullptr) = 0;
 
     virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id) = 0;
 
@@ -278,7 +279,7 @@ public:
         return _default_rowset_type;
     }
 
-    Status start_bg_threads() override;
+    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) 
override;
 
     // clear trash and snapshot file
     // option: update disk usage after sweep
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index bdb5bec1776..3c1b08063df 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -43,7 +43,7 @@ namespace doris::pipeline {
 
 class TaskScheduler {
 public:
-    TaskScheduler(int core_num, std::string name, CgroupCpuCtl* cgroup_cpu_ctl)
+    TaskScheduler(int core_num, std::string name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
             : _task_queue(core_num),
               _shutdown(false),
               _name(std::move(name)),
@@ -65,7 +65,7 @@ private:
     std::vector<bool> _markers;
     bool _shutdown;
     std::string _name;
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
 
     void _do_work(int index);
 };
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 75ec588aa50..0f0b677bb1c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -276,6 +276,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _pipeline_tracer_ctx = 
std::make_unique<pipeline::PipelineTracerContext>(); // before query
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _workload_group_manager = new WorkloadGroupMgr();
+    _workload_group_manager->init_internal_workload_group();
     _scanner_scheduler = new doris::vectorized::ScannerScheduler();
     _fragment_mgr = new FragmentMgr(this);
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -364,7 +365,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
         return st;
     }
     _storage_engine->set_heartbeat_flags(this->heartbeat_flags());
-    if (st = _storage_engine->start_bg_threads(); !st.ok()) {
+    WorkloadGroupPtr internal_wg = _workload_group_manager->get_internal_wg();
+    if (st = _storage_engine->start_bg_threads(internal_wg); !st.ok()) {
         LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << 
st;
         return st;
     }
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index c6a3c07adda..f62179273cf 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -50,7 +50,9 @@ const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
 const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
 const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
 
-WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
+WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& wg_info) : 
WorkloadGroup(wg_info, true) {}
+
+WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool 
need_create_query_thread_pool)
         : _id(tg_info.id),
           _name(tg_info.name),
           _version(tg_info.version),
@@ -65,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
           _spill_low_watermark(tg_info.spill_low_watermark),
           _spill_high_watermark(tg_info.spill_high_watermark),
           _scan_bytes_per_second(tg_info.read_bytes_per_second),
-          _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
+          _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
+          _need_create_query_thread_pool(need_create_query_thread_pool) {
     std::vector<DataDirInfo>& data_dir_list = 
io::BeConfDataDirReader::be_config_data_dir_list;
     for (const auto& data_dir : data_dir_list) {
         _scan_io_throttle_map[data_dir.path] =
@@ -434,54 +437,60 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
             .remote_read_bytes_per_second = remote_read_bytes_per_second};
 }
 
-void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* 
exec_env) {
-    uint64_t tg_id = tg_info->id;
-    std::string tg_name = tg_info->name;
-    int cpu_hard_limit = tg_info->cpu_hard_limit;
-    uint64_t cpu_shares = tg_info->cpu_share;
-    bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
-    int scan_thread_num = tg_info->scan_thread_num;
-    int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
-    int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+std::weak_ptr<CgroupCpuCtl> WorkloadGroup::get_cgroup_cpu_ctl_wptr() {
+    std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+    return _cgroup_cpu_ctl;
+}
 
+void WorkloadGroup::create_cgroup_cpu_ctl() {
     std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    create_cgroup_cpu_ctl_no_lock();
+}
+
+void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
     if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
-        std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
+        std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
CgroupCpuCtl::create_cgroup_cpu_ctl(_id);
         if (cgroup_cpu_ctl) {
             Status ret = cgroup_cpu_ctl->init();
             if (ret.ok()) {
                 _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
-                LOG(INFO) << "[upsert wg thread pool] cgroup init success, 
wg_id=" << tg_id;
+                LOG(INFO) << "[upsert wg thread pool] cgroup init success, 
wg_id=" << _id;
             } else {
-                LOG(INFO) << "[upsert wg thread pool] cgroup init failed, 
wg_id=" << tg_id
+                LOG(INFO) << "[upsert wg thread pool] cgroup init failed, 
wg_id=" << _id
                           << ", reason=" << ret.to_string();
             }
         } else {
-            LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " 
<< tg_id << " failed";
+            LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl 
wg_id=" << _id << " failed";
         }
     }
+}
 
-    CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
-
+void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+                                               std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr) {
+    uint64_t wg_id = wg_info->id;
+    std::string wg_name = wg_info->name;
+    int scan_thread_num = wg_info->scan_thread_num;
+    int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
+    int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
     if (_task_sched == nullptr) {
         int32_t executors_size = config::pipeline_executor_size;
         if (executors_size <= 0) {
             executors_size = CpuInfo::num_cores();
         }
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
-                std::make_unique<pipeline::TaskScheduler>(executors_size, 
"Pipe_" + tg_name,
+                std::make_unique<pipeline::TaskScheduler>(executors_size, 
"Pipe_" + wg_name,
                                                           cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
             _task_sched = std::move(pipeline_task_scheduler);
         } else {
-            LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << tg_id;
+            LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << wg_id;
         }
     }
 
     if (_scan_task_sched == nullptr) {
         std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ tg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ wg_name,
                                                                       
cg_cpu_ctl_ptr);
         Status ret = 
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
                                            
config::doris_scanner_thread_pool_thread_num,
@@ -489,7 +498,7 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         if (ret.ok()) {
             _scan_task_sched = std::move(scan_scheduler);
         } else {
-            LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << tg_id;
+            LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << wg_id;
         }
     }
     if (scan_thread_num > 0 && _scan_task_sched) {
@@ -501,7 +510,7 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         int remote_scan_thread_queue_size =
                 
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ tg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ wg_name,
                                                                       
cg_cpu_ctl_ptr);
         Status ret = remote_scan_scheduler->start(remote_max_thread_num,
                                                   
config::doris_scanner_min_thread_pool_thread_num,
@@ -510,7 +519,7 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
             LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start 
failed, gid="
-                      << tg_id;
+                      << wg_id;
         }
     }
     if (max_remote_scan_thread_num >= min_remote_scan_thread_num && 
_remote_scan_task_sched) {
@@ -532,7 +541,7 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                                       : std::min(num_disk * min_threads,
                                                  num_cpus * 
config::wg_flush_thread_num_per_cpu);
 
-            std::string pool_name = "wg_flush_" + tg_name;
+            std::string pool_name = "wg_flush_" + wg_name;
             auto ret = ThreadPoolBuilder(pool_name)
                                .set_min_threads(min_threads)
                                .set_max_threads(max_threads)
@@ -540,17 +549,24 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                                .build(&thread_pool);
             if (!ret.ok()) {
                 LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
failed, gid="
-                          << tg_id;
+                          << wg_id;
             } else {
                 _memtable_flush_pool = std::move(thread_pool);
-                LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
succ, gid=" << tg_id
+                LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
succ, gid=" << wg_id
                           << ", max thread num=" << max_threads
                           << ", min thread num=" << min_threads;
             }
         }
     }
+}
+
+void WorkloadGroup::upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info) {
+    uint64_t wg_id = wg_info->id;
+    int cpu_hard_limit = wg_info->cpu_hard_limit;
+    uint64_t cpu_shares = wg_info->cpu_share;
+    bool enable_cpu_hard_limit = wg_info->enable_cpu_hard_limit;
+    create_cgroup_cpu_ctl_no_lock();
 
-    // step 6: update cgroup cpu if needed
     if (_cgroup_cpu_ctl) {
         if (enable_cpu_hard_limit) {
             if (cpu_hard_limit > 0) {
@@ -560,15 +576,24 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
             } else {
                 LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is "
                              "illegal: "
-                          << cpu_hard_limit << ", gid=" << tg_id;
+                          << cpu_hard_limit << ", gid=" << wg_id;
             }
         } else {
             _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
             _cgroup_cpu_ctl->update_cpu_hard_limit(
                     CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
         }
-        _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
-                                             
&(tg_info->cgroup_cpu_hard_limit));
+        _cgroup_cpu_ctl->get_cgroup_cpu_info(&(wg_info->cgroup_cpu_shares),
+                                             
&(wg_info->cgroup_cpu_hard_limit));
+    }
+}
+
+void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info) {
+    std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    upsert_cgroup_cpu_ctl_no_lock(wg_info);
+
+    if (_need_create_query_thread_pool) {
+        upsert_thread_pool_no_lock(wg_info, _cgroup_cpu_ctl);
     }
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 2ba84ce982b..96b8a36df1c 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -58,6 +58,8 @@ class WorkloadGroup : public 
std::enable_shared_from_this<WorkloadGroup> {
 public:
     explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
 
+    explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool 
need_create_query_thread_pool);
+
     int64_t version() const { return _version; }
 
     uint64_t cpu_share() const { return _cpu_share.load(); }
@@ -165,7 +167,7 @@ public:
 
     int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool 
is_minor_gc);
 
-    void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
+    void upsert_task_scheduler(WorkloadGroupInfo* tg_info);
 
     void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
                              vectorized::SimplifiedScanScheduler** scan_sched,
@@ -198,18 +200,21 @@ public:
     }
     int64_t get_remote_scan_bytes_per_second();
 
-    CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() {
-        std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
-        return _cgroup_cpu_ctl.get();
-    }
-
     ThreadPool* get_memtable_flush_pool_ptr() {
         // no lock here because this is called by memtable flush,
         // to avoid lock competition with the workload thread pool's update
         return _memtable_flush_pool.get();
     }
+    void create_cgroup_cpu_ctl();
+
+    std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
 
 private:
+    void create_cgroup_cpu_ctl_no_lock();
+    void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
+    void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+                                    std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr);
+
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
     std::string _name;
@@ -240,7 +245,10 @@ private:
     std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
 
     std::shared_mutex _task_sched_lock;
-    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
+    // _cgroup_cpu_ctl not only used by threadpool which managed by 
WorkloadGroup,
+    // but also some global background threadpool which not owned by 
WorkloadGroup,
+    // so it should be shared ptr;
+    std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
     std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched 
{nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_task_sched {nullptr};
@@ -249,6 +257,9 @@ private:
     std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
     std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};
 
+    // for some background workload, it doesn't need to create query thread 
pool
+    const bool _need_create_query_thread_pool;
+
     // bvar metric
     std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
     std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 927d4d13814..4d32fc8700e 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -34,6 +34,25 @@
 
 namespace doris {
 
+void WorkloadGroupMgr::init_internal_workload_group() {
+    WorkloadGroupPtr internal_wg = nullptr;
+    {
+        std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
+        if (_workload_groups.find(INTERNAL_WORKLOAD_GROUP_ID) == 
_workload_groups.end()) {
+            WorkloadGroupInfo internal_wg_info {
+                    .id = INTERNAL_WORKLOAD_GROUP_ID,
+                    .name = INTERNAL_WORKLOAD_GROUP_NAME,
+                    .cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value()};
+            internal_wg = std::make_shared<WorkloadGroup>(internal_wg_info, 
false);
+            _workload_groups[internal_wg_info.id] = internal_wg;
+        }
+    }
+    DCHECK(internal_wg != nullptr);
+    if (internal_wg) {
+        internal_wg->create_cgroup_cpu_ctl();
+    }
+}
+
 WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
         const WorkloadGroupInfo& workload_group_info) {
     {
@@ -86,6 +105,10 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
         old_wg_size = _workload_groups.size();
         for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
             uint64_t wg_id = iter->first;
+            // internal workload group created by BE can not be dropped
+            if (wg_id == INTERNAL_WORKLOAD_GROUP_ID) {
+                continue;
+            }
             auto workload_group_ptr = iter->second;
             if (used_wg_id.find(wg_id) == used_wg_id.end()) {
                 workload_group_ptr->shutdown();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index f76e98d2606..18a0687b373 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -36,11 +36,18 @@ class TaskScheduler;
 class MultiCoreTaskQueue;
 } // namespace pipeline
 
+// internal_group is used for doris internal workload, currently is mainly 
compaction
+const static uint64_t INTERNAL_WORKLOAD_GROUP_ID =
+        static_cast<uint64_t>(TWorkloadType::type::INTERNAL);
+const static std::string INTERNAL_WORKLOAD_GROUP_NAME = "_internal";
+
 class WorkloadGroupMgr {
 public:
     WorkloadGroupMgr() = default;
     ~WorkloadGroupMgr() = default;
 
+    void init_internal_workload_group();
+
     WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& 
workload_group_info);
 
     void get_related_workload_groups(const std::function<bool(const 
WorkloadGroupPtr& ptr)>& pred,
@@ -64,6 +71,11 @@ public:
 
     void get_wg_resource_usage(vectorized::Block* block);
 
+    WorkloadGroupPtr get_internal_wg() {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
+    }
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index 15fb36181d4..f5ea38515de 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -75,7 +75,8 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int 
max_queue_size) {
     return *this;
 }
 
-ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* 
cgroup_cpu_ctl) {
+ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(
+        std::weak_ptr<CgroupCpuCtl> cgroup_cpu_ctl) {
     _cgroup_cpu_ctl = cgroup_cpu_ctl;
     return *this;
 }
@@ -476,8 +477,8 @@ void ThreadPool::dispatch_thread() {
     _num_threads++;
     _num_threads_pending_start--;
 
-    if (_cgroup_cpu_ctl != nullptr) {
-        static_cast<void>(_cgroup_cpu_ctl->add_thread_to_cgroup());
+    if (std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_sptr = 
_cgroup_cpu_ctl.lock()) {
+        static_cast<void>(cg_cpu_ctl_sptr->add_thread_to_cgroup());
     }
 
     // Owned by this worker thread and added/removed from _idle_threads as 
needed.
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 5ce27e2f27b..9bd4a7246fb 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -107,7 +107,7 @@ public:
     ThreadPoolBuilder& set_min_threads(int min_threads);
     ThreadPoolBuilder& set_max_threads(int max_threads);
     ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
-    ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl);
+    ThreadPoolBuilder& set_cgroup_cpu_ctl(std::weak_ptr<CgroupCpuCtl> 
cgroup_cpu_ctl);
     template <class Rep, class Period>
     ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, 
Period>& idle_timeout) {
         _idle_timeout = 
std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
@@ -133,7 +133,7 @@ private:
     int _min_threads;
     int _max_threads;
     int _max_queue_size;
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
     std::chrono::milliseconds _idle_timeout;
 
     ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
@@ -345,7 +345,7 @@ private:
     // Protected by _lock.
     int _total_queued_tasks;
 
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
 
     // All allocated tokens.
     //
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 56c49368598..7731b3ba8f9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -114,11 +114,8 @@ struct SimplifiedScanTask {
 
 class SimplifiedScanScheduler {
 public:
-    SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* 
cgroup_cpu_ctl) {
-        _is_stop.store(false);
-        _cgroup_cpu_ctl = cgroup_cpu_ctl;
-        _sched_name = sched_name;
-    }
+    SimplifiedScanScheduler(std::string sched_name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
+            : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), 
_sched_name(sched_name) {}
 
     ~SimplifiedScanScheduler() {
         stop();
@@ -217,7 +214,7 @@ public:
 private:
     std::unique_ptr<ThreadPool> _scan_thread_pool;
     std::atomic<bool> _is_stop;
-    CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
+    std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
     std::string _sched_name;
 };
 
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 432ec1c54b5..c17b84b2dbe 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -107,12 +107,13 @@ void AsyncResultWriter::process_block(RuntimeState* 
state, RuntimeProfile* profi
         force_close(status);
     }
 
-    if (state && state->get_query_ctx()) {
-        WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group();
-        if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) {
-            Status ret = 
wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup();
+    if (state && state->get_query_ctx() && 
state->get_query_ctx()->workload_group()) {
+        if (auto cg_ctl_sptr =
+                    
state->get_query_ctx()->workload_group()->get_cgroup_cpu_ctl_wptr().lock()) {
+            Status ret = cg_ctl_sptr->add_thread_to_cgroup();
             if (ret.ok()) {
-                std::string wg_tname = "asyc_wr_" + wg_ptr->name();
+                std::string wg_tname =
+                        "asyc_wr_" + 
state->get_query_ctx()->workload_group()->name();
                 Thread::set_self_name(wg_tname);
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
index 8cba792dd39..4405da6ce13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterWorkloadGroupStmt.java
@@ -25,6 +25,10 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 
@@ -55,14 +59,26 @@ public class AlterWorkloadGroupStmt extends DdlStmt 
implements NotFallbackInPars
         }
 
         if (properties == null || properties.isEmpty()) {
-            throw new AnalysisException("Resource group properties can't be 
null");
+            throw new AnalysisException("Workload Group properties can't be 
empty");
+        }
+
+        if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
+            throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can 
not be create or modified ");
+        }
+
+        String tagStr = properties.get(WorkloadGroup.TAG);
+        if (!StringUtils.isEmpty(tagStr) && 
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
+                || 
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+            throw new AnalysisException(
+                    WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + 
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+                            + " group can not set tag");
         }
     }
 
     @Override
     public String toSql() {
         StringBuilder sb = new StringBuilder();
-        sb.append("ALTER RESOURCE GROUP 
'").append(workloadGroupName).append("' ");
+        sb.append("ALTER WORKLOAD GROUP 
'").append(workloadGroupName).append("' ");
         sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", 
true, false)).append(")");
         return sb.toString();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
index fc4f99046d5..dd13542a836 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java
@@ -27,6 +27,9 @@ import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.workloadgroup.WorkloadGroup;
+import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 
@@ -68,12 +71,19 @@ public class CreateWorkloadGroupStmt extends DdlStmt 
implements NotFallbackInPar
         FeNameFormat.checkWorkloadGroupName(workloadGroupName);
 
         if (properties == null || properties.isEmpty()) {
-            throw new AnalysisException("Resource group properties can't be 
null");
+            throw new AnalysisException("Workload Group properties can't be 
empty");
+        }
+
+        if (properties.containsKey(WorkloadGroup.INTERNAL_TYPE)) {
+            throw new AnalysisException(WorkloadGroup.INTERNAL_TYPE + " can 
not be create or modified ");
         }
 
-        String wgTag = properties.get(WorkloadGroup.TAG);
-        if (wgTag != null) {
-            FeNameFormat.checkCommonName("workload group tag", wgTag);
+        String tagStr = properties.get(WorkloadGroup.TAG);
+        if (!StringUtils.isEmpty(tagStr) && 
(WorkloadGroupMgr.DEFAULT_GROUP_NAME.equals(workloadGroupName)
+                || 
WorkloadGroupMgr.INTERNAL_GROUP_NAME.equals(workloadGroupName))) {
+            throw new AnalysisException(
+                    WorkloadGroupMgr.INTERNAL_GROUP_NAME + " and " + 
WorkloadGroupMgr.DEFAULT_GROUP_NAME
+                            + " group can not set tag");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
index e4e3055f128..9356c6b5c4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropWorkloadGroupStmt.java
@@ -20,7 +20,6 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
@@ -50,8 +49,6 @@ public class DropWorkloadGroupStmt extends DdlStmt implements 
NotFallbackInParse
         if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
         }
-
-        FeNameFormat.checkWorkloadGroupName(workloadGroupName);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 12ac201eb65..7fb12b9621a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -250,6 +250,7 @@ import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.Tag;
+import 
org.apache.doris.resource.workloadgroup.CreateInternalWorkloadGroupThread;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
@@ -1874,6 +1875,7 @@ public class Env {
         WorkloadSchedPolicyPublisher wpPublisher = new 
WorkloadSchedPolicyPublisher(this);
         topicPublisherThread.addToTopicPublisherList(wpPublisher);
         topicPublisherThread.start();
+        new CreateInternalWorkloadGroupThread().start();
 
         // auto analyze related threads.
         statisticsCleaner.start();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 2dc6a90d593..cfc9d85c6a8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -244,10 +244,11 @@ public class InternalSchemaInitializer extends Thread {
         // statistics
         Env.getCurrentEnv().getInternalCatalog().createTable(
                 
buildStatisticsTblStmt(StatisticConstants.TABLE_STATISTIC_TBL_NAME,
-                    Lists.newArrayList("id", "catalog_id", "db_id", "tbl_id", 
"idx_id", "col_id", "part_id")));
+                        Lists.newArrayList("id", "catalog_id", "db_id", 
"tbl_id", "idx_id", "col_id", "part_id")));
         Env.getCurrentEnv().getInternalCatalog().createTable(
                 
buildStatisticsTblStmt(StatisticConstants.PARTITION_STATISTIC_TBL_NAME,
-                    Lists.newArrayList("catalog_id", "db_id", "tbl_id", 
"idx_id", "part_name", "part_id", "col_id")));
+                        Lists.newArrayList("catalog_id", "db_id", "tbl_id", 
"idx_id", "part_name", "part_id",
+                                "col_id")));
         // audit table
         
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 1c24ca69d4f..214fbe0e410 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -50,6 +50,9 @@ public class FeConstants {
     // set to false to disable internal schema db
     public static boolean enableInternalSchemaDb = true;
 
+    // for UT, create internal workload group thread can not start
+    public static boolean shouldCreateInternalWorkloadGroup = true;
+
     // default scheduler interval is 10 seconds
     public static int default_scheduler_interval_millisecond = 10000;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
new file mode 100644
index 00000000000..7c6d0e3a080
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/CreateInternalWorkloadGroupThread.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.workloadgroup;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CreateInternalWorkloadGroupThread extends Thread {
+
+    private static final Logger LOG = 
LogManager.getLogger(CreateInternalWorkloadGroupThread.class);
+
+    public CreateInternalWorkloadGroupThread() {
+        super("CreateInternalWorkloadGroupThread");
+    }
+
+    public void run() {
+        if (!FeConstants.shouldCreateInternalWorkloadGroup) {
+            return;
+        }
+        try {
+            Env env = Env.getCurrentEnv();
+            while (!env.isReady()) {
+                Thread.sleep(5000);
+            }
+            if (!env.getWorkloadGroupMgr()
+                    
.isWorkloadGroupExists(WorkloadGroupMgr.INTERNAL_GROUP_NAME)) {
+                env.getWorkloadGroupMgr().createInternalWorkloadGroup();
+                LOG.info("create internal workload group succ");
+            } else {
+                LOG.info("internal workload group already exists.");
+            }
+        } catch (Throwable t) {
+            LOG.warn("create internal workload group failed. ", t);
+        }
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 44fb98e10ef..7d5e792ef71 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -30,8 +30,10 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TWorkloadGroupInfo;
+import org.apache.doris.thrift.TWorkloadType;
 import org.apache.doris.thrift.TopicInfo;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang3.StringUtils;
@@ -43,8 +45,11 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 public class WorkloadGroup implements Writable, GsonPostProcessable {
     private static final Logger LOG = 
LogManager.getLogger(WorkloadGroup.class);
@@ -79,6 +84,11 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String REMOTE_READ_BYTES_PER_SECOND = 
"remote_read_bytes_per_second";
 
+    // it's used to define Doris's internal workload group,
+    // currently it is internal, only contains compaction
+    // later more type and workload may be included in the future.
+    public static final String INTERNAL_TYPE = "internal_type";
+
     // NOTE(wb): all property is not required, some properties default value 
is set in be
     // default value is as followed
     // cpu_share=1024, memory_limit=0%(0 means not limit), 
enable_memory_overcommit=true
@@ -87,7 +97,10 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
             .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
             
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
-            
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
+            
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).add(INTERNAL_TYPE).build();
+
+    public static final ImmutableMap<String, Integer> WORKLOAD_TYPE_MAP = new 
ImmutableMap.Builder<String, Integer>()
+            .put(TWorkloadType.INTERNAL.toString().toLowerCase(), 
TWorkloadType.INTERNAL.getValue()).build();
 
     public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
     public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
@@ -420,13 +433,31 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             String[] tagArr = tagStr.split(",");
             for (String tag : tagArr) {
                 try {
-                    FeNameFormat.checkCommonName("workload group tag name", 
tag);
+                    FeNameFormat.checkCommonName("workload group tag", tag);
                 } catch (AnalysisException e) {
-                    throw new DdlException("workload group tag name format is 
illegal, " + tagStr);
+                    throw new DdlException("tag format is illegal, " + tagStr);
                 }
             }
         }
 
+        // internal workload group is usually created by Doris.
+        // If exception happens here, it means thrift not match 
WORKLOAD_TYPE_MAP.
+        String interTypeId = properties.get(WorkloadGroup.INTERNAL_TYPE);
+        if (!StringUtils.isEmpty(interTypeId)) {
+            int wid = Integer.valueOf(interTypeId);
+            if (TWorkloadType.findByValue(wid) == null) {
+                throw new DdlException("error internal type id: " + wid + ", 
current id map:" + WORKLOAD_TYPE_MAP);
+            }
+        }
+
+    }
+
+    Optional<Integer> getInternalTypeId() {
+        String typeIdStr = this.properties.get(INTERNAL_TYPE);
+        if (StringUtils.isEmpty(typeIdStr)) {
+            return Optional.empty();
+        }
+        return Optional.of(Integer.valueOf(typeIdStr));
     }
 
     public long getId() {
@@ -535,8 +566,18 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         return cpuHardLimit;
     }
 
-    public String getTag() {
-        return properties.get(TAG);
+    public Optional<Set<String>> getTag() {
+        String tagStr = properties.get(TAG);
+        if (StringUtils.isEmpty(tagStr)) {
+            return Optional.empty();
+        }
+
+        Set<String> tagSet = new HashSet<>();
+        String[] ss = tagStr.split(",");
+        for (String str : ss) {
+            tagSet.add(str);
+        }
+        return Optional.of(tagSet);
     }
 
     @Override
@@ -550,7 +591,13 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public TopicInfo toTopicInfo() {
         TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
-        tWorkloadGroupInfo.setId(id);
+        long wgId = this.id;
+        Optional<Integer> internalTypeId = getInternalTypeId();
+        if (internalTypeId.isPresent()) {
+            wgId = internalTypeId.get();
+        }
+        tWorkloadGroupInfo.setId(wgId);
+
         tWorkloadGroupInfo.setName(name);
         tWorkloadGroupInfo.setVersion(version);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 8464d83bdbc..26798bb1ec3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -42,6 +42,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TWorkloadType;
 import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Strings;
@@ -49,7 +50,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -62,6 +62,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -71,6 +72,12 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
 
     public static final Long DEFAULT_GROUP_ID = 1L;
 
+    public static final String INTERNAL_GROUP_NAME = "_internal";
+
+    // internal_type_id could be converted to workload group id when Workload 
published to BE
+    // refer WorkloadGroup.toTopicInfo
+    public static final Long INTERNAL_TYPE_ID = 
Long.valueOf(TWorkloadType.INTERNAL.getValue());
+
     public static final ImmutableList<String> 
WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
             
.add("Id").add("Name").add(WorkloadGroup.CPU_SHARE).add(WorkloadGroup.MEMORY_LIMIT)
             .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT)
@@ -375,44 +382,84 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
         LOG.info("Create workload group success: {}", workloadGroup);
     }
 
+    public void createInternalWorkloadGroup() {
+        Map<String, String> properties = Maps.newHashMap();
+        // 100 is cgroup v2 default cpu_share value
+        properties.put(WorkloadGroup.CPU_SHARE, "100");
+        properties.put(WorkloadGroup.INTERNAL_TYPE, 
String.valueOf(INTERNAL_TYPE_ID));
+        WorkloadGroup wg = new WorkloadGroup(Env.getCurrentEnv().getNextId(), 
INTERNAL_GROUP_NAME, properties);
+        writeLock();
+        try {
+            if (!nameToWorkloadGroup.containsKey(wg.getName())) {
+                nameToWorkloadGroup.put(wg.getName(), wg);
+                idToWorkloadGroup.put(wg.getId(), wg);
+                Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(wg);
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
     // NOTE: used for checking sum value of 100%  for cpu_hard_limit and 
memory_limit
     //  when create/alter workload group with same tag.
     //  when oldWg is null it means caller is an alter stmt.
     private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) 
throws DdlException {
-        String wgTag = newWg.getTag();
-        double sumOfAllMemLimit = 0;
-        int sumOfAllCpuHardLimit = 0;
-        for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
-            WorkloadGroup wg = entry.getValue();
-            if (!StringUtils.equals(wgTag, wg.getTag())) {
-                continue;
-            }
+        Optional<Set<String>> newWgTag = newWg.getTag();
+        Set<String> newWgTagSet = null;
+        if (newWgTag.isPresent()) {
+            newWgTagSet = newWgTag.get();
+        } else {
+            newWgTagSet = new HashSet<>();
+            newWgTagSet.add(null);
+        }
 
-            if (oldWg != null && entry.getKey() == oldWg.getId()) {
-                continue;
-            }
+        for (String newWgOneTag : newWgTagSet) {
+            double sumOfAllMemLimit = 0;
+            int sumOfAllCpuHardLimit = 0;
 
-            if (wg.getCpuHardLimit() > 0) {
-                sumOfAllCpuHardLimit += wg.getCpuHardLimit();
-            }
-            if (wg.getMemoryLimitPercent() > 0) {
-                sumOfAllMemLimit += wg.getMemoryLimitPercent();
+            // 1 get sum value of all wg which has same tag without current wg
+            for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
+                WorkloadGroup wg = entry.getValue();
+                Optional<Set<String>> wgTag = wg.getTag();
+
+                if (oldWg != null && entry.getKey() == oldWg.getId()) {
+                    continue;
+                }
+
+                if (newWgOneTag == null) {
+                    if (wgTag.isPresent()) {
+                        continue;
+                    }
+                } else if (!wgTag.isPresent() || 
(!wgTag.get().contains(newWgOneTag))) {
+                    continue;
+                }
+
+                if (wg.getCpuHardLimit() > 0) {
+                    sumOfAllCpuHardLimit += wg.getCpuHardLimit();
+                }
+                if (wg.getMemoryLimitPercent() > 0) {
+                    sumOfAllMemLimit += wg.getMemoryLimitPercent();
+                }
             }
-        }
 
-        sumOfAllMemLimit += newWg.getMemoryLimitPercent();
-        sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
+            // 2 sum current wg value
+            sumOfAllMemLimit += newWg.getMemoryLimitPercent();
+            sumOfAllCpuHardLimit += newWg.getCpuHardLimit();
 
-        if (sumOfAllMemLimit > 100.0 + 1e-6) {
-            throw new DdlException(
-                    "The sum of all workload group " + 
WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag
-                            + " cannot be greater than 100.0%.");
-        }
+            // 3 check total sum
+            if (sumOfAllMemLimit > 100.0 + 1e-6) {
+                throw new DdlException(
+                        "The sum of all workload group " + 
WorkloadGroup.MEMORY_LIMIT + " within tag " + (
+                                newWgTag.isPresent() ? newWgTag.get() : "")
+                                + " cannot be greater than 100.0%. current sum 
val:" + sumOfAllMemLimit);
+            }
 
-        if (sumOfAllCpuHardLimit > 100) {
-            throw new DdlException(
-                    "sum of all workload group " + 
WorkloadGroup.CPU_HARD_LIMIT + " within tag "
-                            + wgTag + " can not be greater than 100% ");
+            if (sumOfAllCpuHardLimit > 100) {
+                throw new DdlException(
+                        "sum of all workload group " + 
WorkloadGroup.CPU_HARD_LIMIT + " within tag " + (
+                                newWgTag.isPresent()
+                                        ? newWgTag.get() : "") + " can not be 
greater than 100% ");
+            }
         }
     }
 
@@ -446,8 +493,8 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
 
     public void dropWorkloadGroup(DropWorkloadGroupStmt stmt) throws 
DdlException {
         String workloadGroupName = stmt.getWorkloadGroupName();
-        if (DEFAULT_GROUP_NAME.equals(workloadGroupName)) {
-            throw new DdlException("Dropping default workload group " + 
workloadGroupName + " is not allowed");
+        if (DEFAULT_GROUP_NAME.equals(workloadGroupName) || 
INTERNAL_GROUP_NAME.equals(workloadGroupName)) {
+            throw new DdlException("Dropping workload group " + 
workloadGroupName + " is not allowed");
         }
 
         // if a workload group exists in user property, it should not be 
dropped
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 5f1e3565966..d729881358e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -235,4 +235,226 @@ public class WorkloadGroupMgrTest {
         }
         Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() 
== 5);
     }
+
+    @Test
+    public void testMultiTagCreateWorkloadGroup() throws UserException {
+        Config.enable_workload_group = true;
+        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+
+            {
+                String name = "empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
+                properties.put(WorkloadGroup.TAG, "");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn1,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn2,cn100");
+                try {
+                    CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                    workloadGroupMgr.createWorkloadGroup(createStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn100");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g5";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn5");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g6";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn5");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g7";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn5");
+                try {
+                    CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                    workloadGroupMgr.createWorkloadGroup(createStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+
+    }
+
+
+    @Test
+    public void testMultiTagAlterWorkloadGroup() throws UserException {
+        Config.enable_workload_group = true;
+        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+            {
+                String name = "empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "50%");
+                properties.put(WorkloadGroup.TAG, "");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn1,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn2,cn100");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                properties.put(WorkloadGroup.TAG, "cn2,cn100");
+                AlterWorkloadGroupStmt alterStmt = new 
AlterWorkloadGroupStmt(name, properties);
+                try {
+                    workloadGroupMgr.alterWorkloadGroup(alterStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+    }
+
+
+    @Test
+    public void testMultiTagCreateWorkloadGroupWithNoTag() throws 
UserException {
+        Config.enable_workload_group = true;
+        WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
+
+            {
+                String name = "not_empty_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn1,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "not_empty_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                properties.put(WorkloadGroup.TAG, "cn3,cn2");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            // create not tag workload group
+            {
+                String name = "no_tag_g1";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "10%");
+                properties.put(WorkloadGroup.TAG, "");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "no_tag_g2";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+
+            {
+                String name = "no_tag_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "70%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                try {
+                    workloadGroupMgr.createWorkloadGroup(createStmt);
+                } catch (DdlException e) {
+                    Assert.assertTrue(e.getMessage().contains("The sum of all 
workload group " + WorkloadGroup.MEMORY_LIMIT));
+                }
+            }
+
+            {
+                String name = "no_tag_g3";
+                Map<String, String> properties = Maps.newHashMap();
+                properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
+                CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
+                workloadGroupMgr.createWorkloadGroup(createStmt);
+            }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 8e25efdfada..70adbbd7f99 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -153,6 +153,7 @@ public abstract class TestWithFeService {
     @BeforeAll
     public final void beforeAll() throws Exception {
         FeConstants.enableInternalSchemaDb = false;
+        FeConstants.shouldCreateInternalWorkloadGroup = false;
         beforeCreatingConnectContext();
         connectContext = createDefaultCtx();
         beforeCluster();
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index ed0ae243a1d..533999a853f 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -327,6 +327,10 @@ struct TPublishTopicResult {
     1: required Status.TStatus status
 }
 
+enum TWorkloadType {
+    INTERNAL = 2
+}
+
 struct TGetRealtimeExecStatusRequest {
     // maybe query id or other unique id
     1: optional Types.TUniqueId id
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 41cc190a017..7807578ea81 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -176,6 +176,30 @@ suite("test_crud_wlg") {
         exception "can not be greater than 100%"
     }
 
+    // test alter tag and type
+    test {
+        sql "alter workload group test_group properties ( 'internal_type'='13' 
);"
+
+        exception "internal_type can not be create or modified"
+    }
+
+    test {
+        sql "create workload group inter_wg properties('internal_type'='123');"
+        exception "internal_type can not be create or modified"
+    }
+
+    test {
+        sql "alter workload group normal properties ('tag'='123')"
+
+        exception "_internal and normal group can not set tag"
+    }
+
+    test {
+        sql "alter workload group _internal properties ('tag'='123')"
+
+        exception "_internal and normal group can not set tag"
+    }
+
     sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' 
);"
     qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
     qt_cpu_hard_limit_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from information_schema.workload_groups where name in ('normal','test_group') 
order by name;"
@@ -492,6 +516,11 @@ suite("test_crud_wlg") {
 
 
     // test workload group's tag property, cpu_hard_limit
+    test {
+        sql "create workload group tag_test properties('tag'=' a, b , c ');"
+        exception "tag format is illegal"
+    }
+
     test {
         sql "create workload group if not exists tag1_wg1 properties (  
'cpu_hard_limit'='101%', 'tag'='tag1')"
         exception "must be a positive integer"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to