yiguolei commented on code in PR #44690: URL: https://github.com/apache/doris/pull/44690#discussion_r1926416383
########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -147,87 +153,72 @@ Status ScannerContext::init() { } } - // _scannner_scheduler will be used to submit scan task. - // file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler. - // we should fix this problem in the future. - if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size || - _is_file_scan_operator) { - submit_many_scan_tasks_for_potential_performance_issue = false; - } - - // _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time. // The overall target of our system is to make full utilization of the resources. // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. - // So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than - // 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy. - // For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128. - // and the num_parallel_instances of this scan operator will be 64/2=32. - // For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 4 * 128 / 32 = 16. - // We have 32 instances of this scan operator, so for the ScanNode, we have 16 * 32 = 8 * 64 = 512 scanner tasks can be submitted at a time. - _max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : 0; - - if (_max_thread_num == 0) { - // NOTE: When ignore_data_distribution is true, the parallelism - // of the scan operator is regarded as 1 (actually maybe not). - // That will make the number of scan task can be submitted to the scheduler - // in a vary large value. This logicl is kept from the older implementation. - if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) { - _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1; + if (_max_concurrency = _state->num_scanner_threads(); _max_concurrency == 0) { + if (_serial_scan_operator) { + // If the scan operator is serial, we need to boost the concurrency to ensure a single scan operator + // could make full utilization of the resource. + _max_concurrency = _min_concurrency_of_scan_scheduler; } else { - const size_t factor = _is_file_scan_operator ? 1 : 4; - _max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num / - num_parallel_instances); + _max_concurrency = _min_concurrency_of_scan_scheduler / num_parallel_instances; // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed - // in parallel. We need to make sure the _max_thread_num is smaller than previous value. - _max_thread_num = - std::min(_max_thread_num, config::doris_scanner_thread_pool_thread_num); + // in parallel. We need to make sure the _max_thread_num is smaller than previous value in this situation. + _max_concurrency = + std::min(_max_concurrency, config::doris_scanner_thread_pool_thread_num); } + _max_concurrency = _max_concurrency == 0 ? 1 : _max_concurrency; } - _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; - // In some situation, there are not too many big tablets involed, so we can reduce the thread number. - // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0. - _max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size()); - - // 1. Calculate max concurrency - // For select * from table limit 10; should just use one thread. - if (_local_state->should_run_serial()) { - _max_thread_num = 1; - } + _max_concurrency = std::min(_max_concurrency, (int32_t)_all_scanners.size()); // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. // you can refer https://github.com/apache/doris/issues/35340 for details. int32_t max_column_reader_num = _state->query_options().max_column_reader_num; - if (_max_thread_num != 1 && max_column_reader_num > 0) { + + if (_max_concurrency != 1 && max_column_reader_num > 0) { int32_t scan_column_num = _output_tuple_desc->slots().size(); - int32_t current_column_num = scan_column_num * _max_thread_num; + int32_t current_column_num = scan_column_num * _max_concurrency; if (current_column_num > max_column_reader_num) { int32_t new_max_thread_num = max_column_reader_num / scan_column_num; new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; - if (new_max_thread_num < _max_thread_num) { - int32_t origin_max_thread_num = _max_thread_num; - _max_thread_num = new_max_thread_num; + if (new_max_thread_num < _max_concurrency) { + int32_t origin_max_thread_num = _max_concurrency; + _max_concurrency = new_max_thread_num; LOG(INFO) << "downgrade query:" << print_id(_state->query_id()) << " scan's max_thread_num from " << origin_max_thread_num << " to " - << _max_thread_num << ",column num: " << scan_column_num + << _max_concurrency << ",column num: " << scan_column_num << ", max_column_reader_num: " << max_column_reader_num; } } } - COUNTER_SET(_local_state->_max_scanner_thread_num, (int64_t)_max_thread_num); + // Each scan operator can submit _basic_margin scanner to scheduelr if scheduler has enough resource. + // So that for a single query, we can make sure it could make full utilization of the resource. + _basic_margin = _serial_scan_operator ? _max_concurrency + : _min_concurrency_of_scan_scheduler / + (_state->query_parallel_instance_num()); - // submit `_max_thread_num` running scanners to `ScannerScheduler` - // When a running scanners is finished, it will submit one of the remaining scanners. - for (int i = 0; i < _max_thread_num; ++i) { - std::weak_ptr<ScannerDelegate> next_scanner; - if (_scanners.try_dequeue(next_scanner)) { - RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner))); - _num_running_scanners++; - } + // Make sure the _basic_margin is not too large. + _basic_margin = std::min(_basic_margin, _max_concurrency); + + // For select * from table limit 10; should just use one thread. + if (_local_state->should_run_serial()) { Review Comment: 这里的serial 又是并发度很低了 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org