This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 6e40592ca1 [bug](scanner) Improve limit query performance on olapScannode and avoid infinite loop (#11301) 6e40592ca1 is described below commit 6e40592ca1d1eb8c53d0ad0a3a0e2be15a4dfcdf Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Mon Aug 1 13:50:12 2022 +0800 [bug](scanner) Improve limit query performance on olapScannode and avoid infinite loop (#11301) 1. Fix a bug that query large column table may cause infinite loop 2. Optimize the query logic with limit, for the case where the limit value is relatively small, reduce the parallelism of the scanner, reduce unnecessary resource consumption, and increase the number of similar queries that the system can carry at the same time, and increase the query speed by more than 60% --- be/src/exec/olap_scan_node.cpp | 19 +++++++++++--- be/src/exec/olap_scan_node.h | 2 ++ be/src/exec/olap_scanner.cpp | 18 +++++++++---- be/src/exec/olap_scanner.h | 4 +++ be/src/runtime/fragment_mgr.cpp | 9 +++++++ be/src/runtime/query_fragments_ctx.h | 11 ++++++++ be/src/runtime/row_batch.cpp | 2 +- be/src/vec/exec/volap_scan_node.cpp | 50 ++++++++++++++++++++++++++---------- 8 files changed, 92 insertions(+), 23 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index b3e63ee8cd..9728922a0a 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -92,7 +92,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filter_ctxs[i].runtimefilter = runtime_filter; } - + _batch_size = _limit == -1 ? state->batch_size() + : std::min(static_cast<int64_t>(state->batch_size()), _limit); return Status::OK(); } @@ -260,7 +261,6 @@ Status OlapScanNode::open(RuntimeState* state) { Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - // check if Canceled. if (state->is_cancelled()) { std::unique_lock<std::mutex> l(_row_batches_lock); @@ -813,6 +813,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { } OlapScanner* scanner = new OlapScanner(state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range); + scanner->set_batch_size(_batch_size); // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); @@ -1359,7 +1360,12 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } } - ThreadPoolToken* thread_token = state->get_query_fragments_ctx()->get_token(); + ThreadPoolToken* thread_token = nullptr; + if (limit() != -1 && limit() < 1024) { + thread_token = state->get_query_fragments_ctx()->get_serial_token(); + } else { + thread_token = state->get_query_fragments_ctx()->get_token(); + } /********************************* * The basic strategy of priority scheduling: @@ -1605,7 +1611,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { << ", fragment id=" << print_id(_runtime_state->fragment_instance_id()); break; } - RowBatch* row_batch = new RowBatch(this->row_desc(), state->batch_size(), + RowBatch* row_batch = new RowBatch(this->row_desc(), _batch_size, _runtime_state->fragment_mem_tracker().get()); row_batch->set_scanner_id(scanner->id()); status = scanner->get_batch(_runtime_state, row_batch, &eos); @@ -1625,6 +1631,10 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { raw_bytes_read += row_batch->tuple_data_pool()->total_reserved_bytes(); } raw_rows_read = scanner->raw_rows_read(); + if (limit() != -1 && raw_rows_read >= limit()) { + eos = true; + break; + } } { @@ -1643,6 +1653,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { std::lock_guard<SpinLock> guard(_status_mutex); global_status_ok = _status.ok(); } + if (UNLIKELY(!global_status_ok)) { eos = true; for (auto rb : row_batchs) { diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 4cd2902b93..12cdb36f85 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -199,6 +199,8 @@ protected: // object is. ObjectPool _scanner_pool; + size_t _batch_size = 0; + std::shared_ptr<std::thread> _transfer_thread; // Keeps track of total splits and the number finished. diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index bbf3cb9784..a5f29970ea 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -60,8 +60,7 @@ Status OlapScanner::prepare( bloom_filters) { set_tablet_reader(); // set limit to reduce end of rowset and segment mem use - _tablet_reader->set_batch_size(_parent->limit() == -1 ? _parent->_runtime_state->batch_size() : std::min( - static_cast<int64_t>(_parent->_runtime_state->batch_size()), _parent->limit())); + _tablet_reader->set_batch_size(_parent->_batch_size); // Get olap table TTabletId tablet_id = scan_range.tablet_id; @@ -269,9 +268,12 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) { Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { // 2. Allocate Row's Tuple buf - uint8_t* tuple_buf = - batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size()); - bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size()); + uint8_t* tuple_buf = batch->tuple_data_pool()->allocate(_batch_size * _tuple_desc->byte_size()); + if (tuple_buf == nullptr) { + LOG(WARNING) << "Allocate mem for row batch failed."; + return Status::RuntimeError("Allocate mem for row batch failed."); + } + bzero(tuple_buf, _batch_size * _tuple_desc->byte_size()); Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf); std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get())); @@ -284,6 +286,11 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { ObjectPool tmp_object_pool; // release the memory of the object which can't pass the conjuncts. ObjectPool unused_object_pool; + if (batch->tuple_data_pool()->total_reserved_bytes() >= raw_bytes_threshold) { + return Status::RuntimeError( + "Scanner row bytes buffer is too small, please try to increase be config " + "'doris_scanner_row_bytes'."); + } while (true) { // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break if (batch->is_full() || @@ -608,6 +615,7 @@ void OlapScanner::_update_realtime_counter() { COUNTER_UPDATE(_parent->_raw_rows_counter, stats.raw_rows_read); // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash _raw_rows_read += stats.raw_rows_read; + _tablet_reader->mutable_stats()->raw_rows_read = 0; } diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index fa0d7e4578..6be9c5c1aa 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -92,6 +92,8 @@ public: const std::vector<SlotDescriptor*>& get_query_slots() const { return _query_slots; } + void set_batch_size(size_t batch_size) { _batch_size = batch_size; } + protected: Status _init_tablet_reader_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, @@ -141,6 +143,8 @@ protected: int64_t _raw_rows_read = 0; int64_t _compressed_bytes_read = 0; + size_t _batch_size = 0; + // number rows filtered by pushed condition int64_t _num_rows_pushed_cond_filtered = 0; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e5d6a7176e..bad2c50c55 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -620,6 +620,15 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit); } } + if (params.__isset.fragment && params.fragment.__isset.plan && + params.fragment.plan.nodes.size() > 0) { + for (auto& node : params.fragment.plan.nodes) { + if (node.limit > 0 && node.limit < 1024) { + fragments_ctx->set_serial_thread_token(); + break; + } + } + } { // Find _fragments_ctx_map again, in case some other request has already diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 73f6b415b9..1e7b3874c6 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -61,11 +61,17 @@ public: cpu_limit); } } + void set_serial_thread_token() { + _serial_thread_token = _exec_env->limited_scan_thread_pool()->new_token( + ThreadPool::ExecutionMode::SERIAL, 1); + } ThreadPoolToken* get_token() { return _thread_token.get(); } + ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); } + void set_ready_to_execute() { { std::lock_guard<std::mutex> l(_start_lock); @@ -111,6 +117,11 @@ private: // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. std::unique_ptr<ThreadPoolToken> _thread_token; + // A token used to submit olap scanner to the "_limited_scan_thread_pool" serially, it used for + // query like `select * limit 1`, this query used for limit the max scaner thread to 1 to avoid + // this query cost too much resource + std::unique_ptr<ThreadPoolToken> _serial_thread_token; + std::mutex _start_lock; std::condition_variable _start_cond; // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 02d70b16e1..29ca5b2360 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -504,7 +504,7 @@ size_t RowBatch::get_batch_size(const PRowBatch& batch) { void RowBatch::acquire_state(RowBatch* src) { // DCHECK(_row_desc.equals(src->_row_desc)); DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row); - DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); + // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); DCHECK_EQ(_auxiliary_mem_usage, 0); // The destination row batch should be empty. diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 4ea1c3f1f3..1ec25dce91 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -633,21 +633,45 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per } // post volap scanners to thread-pool - PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); + ThreadPoolToken* thread_token = nullptr; + if (_limit > -1 && _limit < 1024) { + thread_token = state->get_query_fragments_ctx()->get_serial_token(); + } else { + thread_token = state->get_query_fragments_ctx()->get_token(); + } auto iter = olap_scanners.begin(); - while (iter != olap_scanners.end()) { - PriorityThreadPool::Task task; - task.work_function = std::bind(&VOlapScanNode::scanner_thread, this, *iter); - task.priority = _nice; - task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); - (*iter)->start_wait_worker_timer(); - COUNTER_UPDATE(_scanner_sched_counter, 1); - if (thread_pool->offer(task)) { - olap_scanners.erase(iter++); - } else { - LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + if (thread_token != nullptr) { + while (iter != olap_scanners.end()) { + auto s = thread_token->submit_func([this, scanner = *iter] { + this->scanner_thread(scanner); + }); + if (s.ok()) { + (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool! " << s.get_error_msg(); + } + ++_total_assign_num; + } + } else { + PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool(); + while (iter != olap_scanners.end()) { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter] { + this->scanner_thread(scanner); + }; + task.priority = _nice; + task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk()); + (*iter)->start_wait_worker_timer(); + COUNTER_UPDATE(_scanner_sched_counter, 1); + if (thread_pool->offer(task)) { + olap_scanners.erase(iter++); + } else { + LOG(FATAL) << "Failed to assign scanner task to thread pool!"; + } + ++_total_assign_num; } - ++_total_assign_num; } return assigned_thread_num; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org