yiguolei commented on code in PR #44690:
URL: https://github.com/apache/doris/pull/44690#discussion_r1926416383


##########
be/src/vec/exec/scan/scanner_context.cpp:
##########
@@ -147,87 +153,72 @@ Status ScannerContext::init() {
         }
     }
 
-    // _scannner_scheduler will be used to submit scan task.
-    // file_scan_operator currentlly has performance issue if we submit too 
many scan tasks to scheduler.
-    // we should fix this problem in the future.
-    if (_scanner_scheduler->get_queue_size() * 2 > 
config::doris_scanner_thread_pool_queue_size ||
-        _is_file_scan_operator) {
-        submit_many_scan_tasks_for_potential_performance_issue = false;
-    }
-
-    // _max_thread_num controls how many scanners of this ScanOperator can be 
submitted to scheduler at a time.
     // The overall target of our system is to make full utilization of the 
resources.
     // At the same time, we dont want too many tasks are queued by scheduler, 
that is not necessary.
-    // So, first of all, we try to make sure _max_thread_num of a ScanNode of 
a query on a single backend is less than
-    // 2 * config::doris_scanner_thread_pool_thread_num, so that we can make 
all io threads busy.
-    // For example, on a 64-core machine, the default value of 
config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
-    // and the num_parallel_instances of this scan operator will be 64/2=32.
-    // For a query who has one scan nodes, the _max_thread_num of each scan 
node instance will be 4 * 128 / 32 = 16.
-    // We have 32 instances of this scan operator, so for the ScanNode, we 
have 16 * 32 = 8 * 64 = 512 scanner tasks can be submitted at a time.
-    _max_thread_num = _state->num_scanner_threads() > 0 ? 
_state->num_scanner_threads() : 0;
-
-    if (_max_thread_num == 0) {
-        // NOTE: When ignore_data_distribution is true, the parallelism
-        // of the scan operator is regarded as 1 (actually maybe not).
-        // That will make the number of scan task can be submitted to the 
scheduler
-        // in a vary large value. This logicl is kept from the older 
implementation.
-        if (submit_many_scan_tasks_for_potential_performance_issue || 
_ignore_data_distribution) {
-            _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
+    if (_max_concurrency = _state->num_scanner_threads(); _max_concurrency == 
0) {
+        if (_serial_scan_operator) {
+            // If the scan operator is serial, we need to boost the 
concurrency to ensure a single scan operator
+            // could make full utilization of the resource.
+            _max_concurrency = _min_concurrency_of_scan_scheduler;
         } else {
-            const size_t factor = _is_file_scan_operator ? 1 : 4;
-            _max_thread_num = factor * 
(config::doris_scanner_thread_pool_thread_num /
-                                        num_parallel_instances);
+            _max_concurrency = _min_concurrency_of_scan_scheduler / 
num_parallel_instances;
             // In some rare cases, user may set num_parallel_instances to 1 
handly to make many query could be executed
-            // in parallel. We need to make sure the _max_thread_num is 
smaller than previous value.
-            _max_thread_num =
-                    std::min(_max_thread_num, 
config::doris_scanner_thread_pool_thread_num);
+            // in parallel. We need to make sure the _max_thread_num is 
smaller than previous value in this situation.
+            _max_concurrency =
+                    std::min(_max_concurrency, 
config::doris_scanner_thread_pool_thread_num);
         }
+        _max_concurrency = _max_concurrency == 0 ? 1 : _max_concurrency;
     }
 
-    _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
-    // In some situation, there are not too many big tablets involed, so we 
can reduce the thread number.
-    // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
-    _max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());
-
-    // 1. Calculate max concurrency
-    // For select * from table limit 10; should just use one thread.
-    if (_local_state->should_run_serial()) {
-        _max_thread_num = 1;
-    }
+    _max_concurrency = std::min(_max_concurrency, 
(int32_t)_all_scanners.size());
 
     // when user not specify scan_thread_num, so we can try downgrade 
_max_thread_num.
     // becaue we found in a table with 5k columns, column reader may ocuppy 
too much memory.
     // you can refer https://github.com/apache/doris/issues/35340 for details.
     int32_t max_column_reader_num = 
_state->query_options().max_column_reader_num;
-    if (_max_thread_num != 1 && max_column_reader_num > 0) {
+
+    if (_max_concurrency != 1 && max_column_reader_num > 0) {
         int32_t scan_column_num = _output_tuple_desc->slots().size();
-        int32_t current_column_num = scan_column_num * _max_thread_num;
+        int32_t current_column_num = scan_column_num * _max_concurrency;
         if (current_column_num > max_column_reader_num) {
             int32_t new_max_thread_num = max_column_reader_num / 
scan_column_num;
             new_max_thread_num = new_max_thread_num <= 0 ? 1 : 
new_max_thread_num;
-            if (new_max_thread_num < _max_thread_num) {
-                int32_t origin_max_thread_num = _max_thread_num;
-                _max_thread_num = new_max_thread_num;
+            if (new_max_thread_num < _max_concurrency) {
+                int32_t origin_max_thread_num = _max_concurrency;
+                _max_concurrency = new_max_thread_num;
                 LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
                           << " scan's max_thread_num from " << 
origin_max_thread_num << " to "
-                          << _max_thread_num << ",column num: " << 
scan_column_num
+                          << _max_concurrency << ",column num: " << 
scan_column_num
                           << ", max_column_reader_num: " << 
max_column_reader_num;
             }
         }
     }
 
-    COUNTER_SET(_local_state->_max_scanner_thread_num, 
(int64_t)_max_thread_num);
+    // Each scan operator can submit _basic_margin scanner to scheduelr if 
scheduler has enough resource.
+    // So that for a single query, we can make sure it could make full 
utilization of the resource.
+    _basic_margin = _serial_scan_operator ? _max_concurrency
+                                          : _min_concurrency_of_scan_scheduler 
/
+                                                    
(_state->query_parallel_instance_num());
 
-    // submit `_max_thread_num` running scanners to `ScannerScheduler`
-    // When a running scanners is finished, it will submit one of the 
remaining scanners.
-    for (int i = 0; i < _max_thread_num; ++i) {
-        std::weak_ptr<ScannerDelegate> next_scanner;
-        if (_scanners.try_dequeue(next_scanner)) {
-            
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner)));
-            _num_running_scanners++;
-        }
+    // Make sure the _basic_margin is not too large.
+    _basic_margin = std::min(_basic_margin, _max_concurrency);
+
+    // For select * from table limit 10; should just use one thread.
+    if (_local_state->should_run_serial()) {

Review Comment:
   这里的serial 又是并发度很低了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to