yiguolei commented on code in PR #44690: URL: https://github.com/apache/doris/pull/44690#discussion_r1926942966
########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -541,4 +467,136 @@ void ScannerContext::update_peak_running_scanner(int num) { _local_state->_peak_running_scanner->add(num); } +int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock, + std::unique_lock<std::shared_mutex>& scheduler_lock) { + // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks. + int32_t margin_1 = _min_concurrency - (_tasks_queue.size() + _num_scheduled_scanners); + + // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. + int32_t margin_2 = + _min_concurrency_of_scan_scheduler - + (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); + + if (margin_1 <= 0 && margin_2 <= 0) { + return 0; + } + + int32_t margin = std::max(margin_1, margin_2); + margin = std::min(margin, _basic_margin); + + VLOG_DEBUG << fmt::format( + "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " + "({} + {}), margin: {}", + print_id(_query_id), ctx_id, margin_1, _min_concurrency, _tasks_queue.size(), + _num_scheduled_scanners, margin_2, _min_concurrency_of_scan_scheduler, + _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin); + + return margin; +} + +// This function must be called with: +// 1. _transfer_lock held. +// 2. SimplifiedScanScheduler::_lock held. +Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, + std::unique_lock<std::mutex>& transfer_lock, + std::unique_lock<std::shared_mutex>& scheduler_lock) { + std::list<std::shared_ptr<ScanTask>> tasks_to_submit; + + int32_t margin = _get_margin(transfer_lock, scheduler_lock); + + // margin is less than zero. Means this scan operator could not submit any scan task for now. + if (margin <= 0) { + // Be careful with current scan task. + // We need to add it back to task queue to make sure it could be resubmitted. + if (current_scan_task && current_scan_task->cached_blocks.empty() && Review Comment: current_scan_task->cached_blocks.empty() 这应该是一个check,不应该是一个if -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org