This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d6f937cb01 (performance)[scanner] Isolate local and remote queries 
using different scanner… (#11006)
d6f937cb01 is described below

commit d6f937cb01821c452abd3ebb096a9c9428fc01c7
Author: Luwei <814383...@qq.com>
AuthorDate: Fri Jul 29 19:14:46 2022 +0800

    (performance)[scanner] Isolate local and remote queries using different 
scanner… (#11006)
---
 be/src/common/config.h              |  5 +++++
 be/src/exec/olap_scan_node.cpp      | 12 +++++++++++-
 be/src/exec/olap_scanner.cpp        | 17 +++++++++++++++++
 be/src/exec/olap_scanner.h          |  2 ++
 be/src/olap/tablet.h                |  2 ++
 be/src/runtime/exec_env.h           |  2 ++
 be/src/runtime/exec_env_init.cpp    |  5 +++++
 be/src/vec/exec/volap_scan_node.cpp | 12 +++++++++++-
 be/src/vec/exec/volap_scanner.cpp   | 17 +++++++++++++++++
 be/src/vec/exec/volap_scanner.h     |  2 ++
 10 files changed, 74 insertions(+), 2 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 36cfeb684d..6f7342bb2f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -805,6 +805,11 @@ CONF_Int32(s3_transfer_executor_pool_size, "2");
 
 CONF_Bool(enable_time_lut, "true");
 
+// number of s3 scanner thread pool size
+CONF_Int32(doris_remote_scanner_thread_pool_thread_num, "16");
+// number of s3 scanner thread pool queue size
+CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "10240");
+
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index ce3486a56d..cabf14ae62 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1502,6 +1502,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
      * 4. Regularly increase the priority of the remaining tasks in the queue 
to avoid starvation for large queries
      *********************************/
     PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
+    PriorityThreadPool* remote_thread_pool = 
state->exec_env()->remote_scan_thread_pool();
     _total_assign_num = 0;
     _nice = 18 + std::max(0, 2 - (int)_olap_scanners.size() / 5);
     std::list<OlapScanner*> olap_scanners;
@@ -1580,8 +1581,17 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
                 task.priority = _nice;
                 task.queue_id = 
state->exec_env()->store_path_to_index((*iter)->scan_disk());
                 (*iter)->start_wait_worker_timer();
+
+                TabletStorageType type = (*iter)->get_storage_type();
+                bool ret = false;
                 COUNTER_UPDATE(_scanner_sched_counter, 1);
-                if (thread_pool->offer(task)) {
+                if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+                    ret = thread_pool->offer(task);
+                } else {
+                    ret = remote_thread_pool->offer(task);
+                }
+
+                if (ret) {
                     olap_scanners.erase(iter++);
                 } else {
                     LOG(FATAL) << "Failed to assign scanner task to thread 
pool!";
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index e49b322652..97d289c3f4 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -127,6 +127,23 @@ Status OlapScanner::prepare(
     return Status::OK();
 }
 
+TabletStorageType OlapScanner::get_storage_type() {
+    int local_reader = 0;
+    for (const auto& reader : _tablet_reader_params.rs_readers) {
+        if (reader->rowset()->rowset_meta()->resource_id().empty()) {
+            local_reader++;
+        }
+    }
+    int total_reader = _tablet_reader_params.rs_readers.size();
+
+    if (local_reader == total_reader) {
+        return TabletStorageType::STORAGE_TYPE_LOCAL;
+    } else if (local_reader == 0) {
+        return TabletStorageType::STORAGE_TYPE_REMOTE;
+    }
+    return TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
+}
+
 Status OlapScanner::open() {
     auto span = _runtime_state->get_tracer()->StartSpan("OlapScanner::open");
     auto scope = opentelemetry::trace::Scope {span};
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 44fab43dc6..e95e31d106 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -88,6 +88,8 @@ public:
 
     const std::vector<SlotDescriptor*>& get_query_slots() const { return 
_query_slots; }
 
+    TabletStorageType get_storage_type();
+
 protected:
     Status _init_tablet_reader_params(
             const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index accc157d6a..adacb11eb2 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -55,6 +55,8 @@ struct RowsetWriterContext;
 
 using TabletSharedPtr = std::shared_ptr<Tablet>;
 
+enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, 
STORAGE_TYPE_REMOTE_AND_LOCAL };
+
 class Tablet : public BaseTablet {
 public:
     static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr 
tablet_meta,
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 2f08c72365..d879b417e0 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -120,6 +120,7 @@ public:
     MemTrackerTaskPool* task_pool_mem_tracker_registry() { return 
_task_pool_mem_tracker_registry; }
     ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
     PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
+    PriorityThreadPool* remote_scan_thread_pool() { return 
_remote_scan_thread_pool; }
     ThreadPool* limited_scan_thread_pool() { return 
_limited_scan_thread_pool.get(); }
     PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
     ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
@@ -201,6 +202,7 @@ private:
 
     // TODO(cmy): find a better way to unify these 2 pools.
     PriorityThreadPool* _scan_thread_pool = nullptr;
+    PriorityThreadPool* _remote_scan_thread_pool = nullptr;
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
     std::unique_ptr<ThreadPool> _send_batch_thread_pool;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index b574fd75b1..afecca4159 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -114,6 +114,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
         LOG(INFO) << "scan thread pool use PriorityThreadPool";
     }
 
+    _remote_scan_thread_pool =
+            new 
PriorityThreadPool(config::doris_remote_scanner_thread_pool_thread_num,
+                                   
config::doris_remote_scanner_thread_pool_queue_size);
+
     ThreadPoolBuilder("LimitedScanThreadPool")
             .set_min_threads(1)
             .set_max_threads(config::doris_scanner_thread_pool_thread_num)
@@ -347,6 +351,7 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_cgroups_mgr);
     SAFE_DELETE(_etl_thread_pool);
     SAFE_DELETE(_scan_thread_pool);
+    SAFE_DELETE(_remote_scan_thread_pool);
     SAFE_DELETE(_thread_mgr);
     SAFE_DELETE(_broker_client_cache);
     SAFE_DELETE(_frontend_client_cache);
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 2e86210a50..993475cfec 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -1878,6 +1878,7 @@ int 
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
     // post volap scanners to thread-pool
     PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
     auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
+    PriorityThreadPool* remote_thread_pool = 
state->exec_env()->remote_scan_thread_pool();
     auto iter = olap_scanners.begin();
     while (iter != olap_scanners.end()) {
         PriorityThreadPool::Task task;
@@ -1888,8 +1889,17 @@ int 
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
         task.priority = _nice;
         task.queue_id = 
state->exec_env()->store_path_to_index((*iter)->scan_disk());
         (*iter)->start_wait_worker_timer();
+
+        TabletStorageType type = (*iter)->get_storage_type();
+        bool ret = false;
         COUNTER_UPDATE(_scanner_sched_counter, 1);
-        if (thread_pool->offer(task)) {
+        if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+            ret = thread_pool->offer(task);
+        } else {
+            ret = remote_thread_pool->offer(task);
+        }
+
+        if (ret) {
             olap_scanners.erase(iter++);
         } else {
             LOG(FATAL) << "Failed to assign scanner task to thread pool!";
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index e9c9b43732..30cfa8861c 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -137,6 +137,23 @@ Status VOlapScanner::open() {
     return Status::OK();
 }
 
+TabletStorageType VOlapScanner::get_storage_type() {
+    int local_reader = 0;
+    for (const auto& reader : _tablet_reader_params.rs_readers) {
+        if (reader->rowset()->rowset_meta()->resource_id().empty()) {
+            local_reader++;
+        }
+    }
+    int total_reader = _tablet_reader_params.rs_readers.size();
+
+    if (local_reader == total_reader) {
+        return TabletStorageType::STORAGE_TYPE_LOCAL;
+    } else if (local_reader == 0) {
+        return TabletStorageType::STORAGE_TYPE_REMOTE;
+    }
+    return TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL;
+}
+
 // it will be called under tablet read lock because capture rs readers need
 Status VOlapScanner::_init_tablet_reader_params(
         const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index 1510ec4d4c..dbede5c4e7 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -89,6 +89,8 @@ public:
 
     std::vector<bool>* mutable_runtime_filter_marks() { return 
&_runtime_filter_marks; }
 
+    TabletStorageType get_storage_type();
+
 private:
     Status _init_tablet_reader_params(
             const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to