yiguolei commented on code in PR #44690:
URL: https://github.com/apache/doris/pull/44690#discussion_r1926942966


##########
be/src/vec/exec/scan/scanner_context.cpp:
##########
@@ -541,4 +467,136 @@ void ScannerContext::update_peak_running_scanner(int num) 
{
     _local_state->_peak_running_scanner->add(num);
 }
 
+int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& 
transfer_lock,
+                                    std::unique_lock<std::shared_mutex>& 
scheduler_lock) {
+    // margin_1 is used to ensure each scan operator could have at least 
_min_scan_concurrency scan tasks.
+    int32_t margin_1 = _min_concurrency - (_tasks_queue.size() + 
_num_scheduled_scanners);
+
+    // margin_2 is used to ensure the scan scheduler could have at least 
_min_scan_concurrency_of_scan_scheduler scan tasks.
+    int32_t margin_2 =
+            _min_concurrency_of_scan_scheduler -
+            (_scanner_scheduler->get_active_threads() + 
_scanner_scheduler->get_queue_size());
+
+    if (margin_1 <= 0 && margin_2 <= 0) {
+        return 0;
+    }
+
+    int32_t margin = std::max(margin_1, margin_2);
+    margin = std::min(margin, _basic_margin);
+
+    VLOG_DEBUG << fmt::format(
+            "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), 
margin_2: {} = {} - "
+            "({} + {}), margin: {}",
+            print_id(_query_id), ctx_id, margin_1, _min_concurrency, 
_tasks_queue.size(),
+            _num_scheduled_scanners, margin_2, 
_min_concurrency_of_scan_scheduler,
+            _scanner_scheduler->get_active_threads(), 
_scanner_scheduler->get_queue_size(), margin);
+
+    return margin;
+}
+
+// This function must be called with:
+// 1. _transfer_lock held.
+// 2. SimplifiedScanScheduler::_lock held.
+Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> 
current_scan_task,
+                                           std::unique_lock<std::mutex>& 
transfer_lock,
+                                           
std::unique_lock<std::shared_mutex>& scheduler_lock) {
+    std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
+
+    int32_t margin = _get_margin(transfer_lock, scheduler_lock);
+
+    // margin is less than zero. Means this scan operator could not submit any 
scan task for now.
+    if (margin <= 0) {
+        // Be careful with current scan task.
+        // We need to add it back to task queue to make sure it could be 
resubmitted.
+        if (current_scan_task && current_scan_task->cached_blocks.empty() &&

Review Comment:
   current_scan_task->cached_blocks.empty()  这应该是一个check,不应该是一个if



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to