yiguolei commented on code in PR #50817:
URL: https://github.com/apache/doris/pull/50817#discussion_r2126319761


##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -486,99 +506,94 @@ void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
     }
 }
 
-void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
-                                               std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr) {
+Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+                                                 std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr) {
+    Status upsert_ret = Status::OK();
     uint64_t wg_id = wg_info->id;
     std::string wg_name = wg_info->name;
+    int pipeline_exec_thread_num = wg_info->pipeline_exec_thread_num;
     int scan_thread_num = wg_info->scan_thread_num;
     int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
     int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
+    int max_flush_thread_num = wg_info->max_flush_thread_num;
+    int min_flush_thread_num = wg_info->min_flush_thread_num;
+
+    // 1 create thread pool
     if (_task_sched == nullptr) {
-        int32_t executors_size = config::pipeline_executor_size;
-        if (executors_size <= 0) {
-            executors_size = CpuInfo::num_cores();
-        }
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
-                std::make_unique<pipeline::TaskScheduler>(executors_size, 
"Pipe_" + wg_name,
+                
std::make_unique<pipeline::TaskScheduler>(pipeline_exec_thread_num, "p_" + 
wg_name,
                                                           cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
             _task_sched = std::move(pipeline_task_scheduler);
         } else {
+            upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << wg_id;
         }
     }
 
     if (_scan_task_sched == nullptr) {
         std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ wg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("ls_" + 
wg_name,
                                                                       
cg_cpu_ctl_ptr, wg_name);
-        Status ret = 
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
-                                           
config::doris_scanner_thread_pool_thread_num,
+        Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
                                            
config::doris_scanner_thread_pool_queue_size);
         if (ret.ok()) {
             _scan_task_sched = std::move(scan_scheduler);
         } else {
+            upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << wg_id;
         }
     }
-    if (scan_thread_num > 0 && _scan_task_sched) {
-        _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
-    }
 
     if (_remote_scan_task_sched == nullptr) {
-        int remote_max_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
         int remote_scan_thread_queue_size =
                 
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ wg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("rs_" + 
wg_name,
                                                                       
cg_cpu_ctl_ptr, wg_name);
-        Status ret = remote_scan_scheduler->start(remote_max_thread_num,
-                                                  
config::doris_scanner_min_thread_pool_thread_num,
-                                                  
remote_scan_thread_queue_size);
+        Status ret =
+                remote_scan_scheduler->start(max_remote_scan_thread_num, 
min_remote_scan_thread_num,
+                                             remote_scan_thread_queue_size);
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
+            upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start 
failed, gid="
                       << wg_id;
         }
     }
+
+    if (_memtable_flush_pool == nullptr) {
+        std::unique_ptr<ThreadPool> thread_pool = nullptr;
+        std::string pool_name = "mf_" + wg_name;
+        auto ret = ThreadPoolBuilder(pool_name)
+                           .set_min_threads(min_flush_thread_num)

Review Comment:
   如果我们normal 是默认创建的,那么memtable flush executor 里,我们那个thread pool,默认初始化,应该是从wg 
manager 里拿normal wg的memtable flush pool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to