yiguolei commented on code in PR #50817:
URL: https://github.com/apache/doris/pull/50817#discussion_r2126292296


##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -486,99 +506,94 @@ void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
     }
 }
 
-void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
-                                               std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr) {
+Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+                                                 std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr) {
+    Status upsert_ret = Status::OK();
     uint64_t wg_id = wg_info->id;
     std::string wg_name = wg_info->name;
+    int pipeline_exec_thread_num = wg_info->pipeline_exec_thread_num;
     int scan_thread_num = wg_info->scan_thread_num;
     int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
     int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
+    int max_flush_thread_num = wg_info->max_flush_thread_num;
+    int min_flush_thread_num = wg_info->min_flush_thread_num;
+
+    // 1 create thread pool
     if (_task_sched == nullptr) {
-        int32_t executors_size = config::pipeline_executor_size;
-        if (executors_size <= 0) {
-            executors_size = CpuInfo::num_cores();
-        }
         std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
-                std::make_unique<pipeline::TaskScheduler>(executors_size, 
"Pipe_" + wg_name,
+                
std::make_unique<pipeline::TaskScheduler>(pipeline_exec_thread_num, "p_" + 
wg_name,
                                                           cg_cpu_ctl_ptr);
         Status ret = pipeline_task_scheduler->start();
         if (ret.ok()) {
             _task_sched = std::move(pipeline_task_scheduler);
         } else {
+            upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, 
gid= " << wg_id;
         }
     }
 
     if (_scan_task_sched == nullptr) {
         std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ wg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("ls_" + 
wg_name,
                                                                       
cg_cpu_ctl_ptr, wg_name);
-        Status ret = 
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
-                                           
config::doris_scanner_thread_pool_thread_num,
+        Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
                                            
config::doris_scanner_thread_pool_queue_size);
         if (ret.ok()) {
             _scan_task_sched = std::move(scan_scheduler);
         } else {
+            upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, 
gid=" << wg_id;
         }
     }
-    if (scan_thread_num > 0 && _scan_task_sched) {
-        _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
-    }
 
     if (_remote_scan_task_sched == nullptr) {
-        int remote_max_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
         int remote_scan_thread_queue_size =
                 
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
-                std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ wg_name,
+                std::make_unique<vectorized::SimplifiedScanScheduler>("rs_" + 
wg_name,
                                                                       
cg_cpu_ctl_ptr, wg_name);
-        Status ret = remote_scan_scheduler->start(remote_max_thread_num,
-                                                  
config::doris_scanner_min_thread_pool_thread_num,
-                                                  
remote_scan_thread_queue_size);
+        Status ret =
+                remote_scan_scheduler->start(max_remote_scan_thread_num, 
min_remote_scan_thread_num,
+                                             remote_scan_thread_queue_size);
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
+            upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start 
failed, gid="
                       << wg_id;
         }
     }
+
+    if (_memtable_flush_pool == nullptr) {

Review Comment:
   如果不是nullptr 怎么处理的?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to