This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d6f937cb01 (performance)[scanner] Isolate local and remote queries using different scanner… (#11006) d6f937cb01 is described below commit d6f937cb01821c452abd3ebb096a9c9428fc01c7 Author: Luwei <814383...@qq.com> AuthorDate: Fri Jul 29 19:14:46 2022 +0800 (performance)[scanner] Isolate local and remote queries using different scanner… (#11006) --- be/src/common/config.h | 5 +++++ be/src/exec/olap_scan_node.cpp | 12 +++++++++++- be/src/exec/olap_scanner.cpp | 17 +++++++++++++++++ be/src/exec/olap_scanner.h | 2 ++ be/src/olap/tablet.h | 2 ++ be/src/runtime/exec_env.h | 2 ++ be/src/runtime/exec_env_init.cpp | 5 +++++ be/src/vec/exec/volap_scan_node.cpp | 12 +++++++++++- be/src/vec/exec/volap_scanner.cpp | 17 +++++++++++++++++ be/src/vec/exec/volap_scanner.h | 2 ++ 10 files changed, 74 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 36cfeb684d..6f7342bb2f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -805,6 +805,11 @@ CONF_Int32(s3_transfer_executor_pool_size, "2"); CONF_Bool(enable_time_lut, "true"); +// number of s3 scanner thread pool size +CONF_Int32(doris_remote_scanner_thread_pool_thread_num, "16"); +// number of s3 scanner thread pool queue size +CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "10240"); + #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index ce3486a56d..cabf14ae62 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1502,6 +1502,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { * 4. Regularly increase the priority of the remaining tasks in the queue to avoid starvation for large queries *********************************/ PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); + PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); _total_assign_num = 0; _nice = 18 + std::max(0, 2 - (int)_olap_scanners.size() / 5); std::list<OlapScanner*> olap_scanners; @@ -1580,8 +1581,17 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { task.priority = _nice; task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); (*iter)->start_wait_worker_timer(); + + TabletStorageType type = (*iter)->get_storage_type(); + bool ret = false; COUNTER_UPDATE(_scanner_sched_counter, 1); - if (thread_pool->offer(task)) { + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + ret = thread_pool->offer(task); + } else { + ret = remote_thread_pool->offer(task); + } + + if (ret) { olap_scanners.erase(iter++); } else { LOG(FATAL) << "Failed to assign scanner task to thread pool!"; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index e49b322652..97d289c3f4 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -127,6 +127,23 @@ Status OlapScanner::prepare( return Status::OK(); } +TabletStorageType OlapScanner::get_storage_type() { + int local_reader = 0; + for (const auto& reader : _tablet_reader_params.rs_readers) { + if (reader->rowset()->rowset_meta()->resource_id().empty()) { + local_reader++; + } + } + int total_reader = _tablet_reader_params.rs_readers.size(); + + if (local_reader == total_reader) { + return TabletStorageType::STORAGE_TYPE_LOCAL; + } else if (local_reader == 0) { + return TabletStorageType::STORAGE_TYPE_REMOTE; + } + return TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL; +} + Status OlapScanner::open() { auto span = _runtime_state->get_tracer()->StartSpan("OlapScanner::open"); auto scope = opentelemetry::trace::Scope {span}; diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index 44fab43dc6..e95e31d106 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -88,6 +88,8 @@ public: const std::vector<SlotDescriptor*>& get_query_slots() const { return _query_slots; } + TabletStorageType get_storage_type(); + protected: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index accc157d6a..adacb11eb2 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -55,6 +55,8 @@ struct RowsetWriterContext; using TabletSharedPtr = std::shared_ptr<Tablet>; +enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL }; + class Tablet : public BaseTablet { public: static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 2f08c72365..d879b417e0 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -120,6 +120,7 @@ public: MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; } + PriorityThreadPool* remote_scan_thread_pool() { return _remote_scan_thread_pool; } ThreadPool* limited_scan_thread_pool() { return _limited_scan_thread_pool.get(); } PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; } ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } @@ -201,6 +202,7 @@ private: // TODO(cmy): find a better way to unify these 2 pools. PriorityThreadPool* _scan_thread_pool = nullptr; + PriorityThreadPool* _remote_scan_thread_pool = nullptr; std::unique_ptr<ThreadPool> _limited_scan_thread_pool; std::unique_ptr<ThreadPool> _send_batch_thread_pool; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index b574fd75b1..afecca4159 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -114,6 +114,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { LOG(INFO) << "scan thread pool use PriorityThreadPool"; } + _remote_scan_thread_pool = + new PriorityThreadPool(config::doris_remote_scanner_thread_pool_thread_num, + config::doris_remote_scanner_thread_pool_queue_size); + ThreadPoolBuilder("LimitedScanThreadPool") .set_min_threads(1) .set_max_threads(config::doris_scanner_thread_pool_thread_num) @@ -347,6 +351,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_cgroups_mgr); SAFE_DELETE(_etl_thread_pool); SAFE_DELETE(_scan_thread_pool); + SAFE_DELETE(_remote_scan_thread_pool); SAFE_DELETE(_thread_mgr); SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 2e86210a50..993475cfec 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -1878,6 +1878,7 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per // post volap scanners to thread-pool PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan(); + PriorityThreadPool* remote_thread_pool = state->exec_env()->remote_scan_thread_pool(); auto iter = olap_scanners.begin(); while (iter != olap_scanners.end()) { PriorityThreadPool::Task task; @@ -1888,8 +1889,17 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per task.priority = _nice; task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); (*iter)->start_wait_worker_timer(); + + TabletStorageType type = (*iter)->get_storage_type(); + bool ret = false; COUNTER_UPDATE(_scanner_sched_counter, 1); - if (thread_pool->offer(task)) { + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + ret = thread_pool->offer(task); + } else { + ret = remote_thread_pool->offer(task); + } + + if (ret) { olap_scanners.erase(iter++); } else { LOG(FATAL) << "Failed to assign scanner task to thread pool!"; diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index e9c9b43732..30cfa8861c 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -137,6 +137,23 @@ Status VOlapScanner::open() { return Status::OK(); } +TabletStorageType VOlapScanner::get_storage_type() { + int local_reader = 0; + for (const auto& reader : _tablet_reader_params.rs_readers) { + if (reader->rowset()->rowset_meta()->resource_id().empty()) { + local_reader++; + } + } + int total_reader = _tablet_reader_params.rs_readers.size(); + + if (local_reader == total_reader) { + return TabletStorageType::STORAGE_TYPE_LOCAL; + } else if (local_reader == 0) { + return TabletStorageType::STORAGE_TYPE_REMOTE; + } + return TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL; +} + // it will be called under tablet read lock because capture rs readers need Status VOlapScanner::_init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index 1510ec4d4c..dbede5c4e7 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -89,6 +89,8 @@ public: std::vector<bool>* mutable_runtime_filter_marks() { return &_runtime_filter_marks; } + TabletStorageType get_storage_type(); + private: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org