This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 6e40592ca1 [bug](scanner) Improve limit query performance on 
olapScannode and avoid infinite loop (#11301)
6e40592ca1 is described below

commit 6e40592ca1d1eb8c53d0ad0a3a0e2be15a4dfcdf
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Mon Aug 1 13:50:12 2022 +0800

    [bug](scanner) Improve limit query performance on olapScannode and avoid 
infinite loop (#11301)
    
    1. Fix a bug that query large column table may cause infinite loop
    2. Optimize the query logic with limit, for the case where the limit value 
is relatively small, reduce the parallelism of the scanner, reduce unnecessary 
resource consumption, and increase the number of similar queries that the 
system can carry at the same time, and increase the query speed by more than 60%
---
 be/src/exec/olap_scan_node.cpp       | 19 +++++++++++---
 be/src/exec/olap_scan_node.h         |  2 ++
 be/src/exec/olap_scanner.cpp         | 18 +++++++++----
 be/src/exec/olap_scanner.h           |  4 +++
 be/src/runtime/fragment_mgr.cpp      |  9 +++++++
 be/src/runtime/query_fragments_ctx.h | 11 ++++++++
 be/src/runtime/row_batch.cpp         |  2 +-
 be/src/vec/exec/volap_scan_node.cpp  | 50 ++++++++++++++++++++++++++----------
 8 files changed, 92 insertions(+), 23 deletions(-)

diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index b3e63ee8cd..9728922a0a 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -92,7 +92,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
 
         _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
     }
-
+    _batch_size = _limit == -1 ? state->batch_size()
+                               : 
std::min(static_cast<int64_t>(state->batch_size()), _limit);
     return Status::OK();
 }
 
@@ -260,7 +261,6 @@ Status OlapScanNode::open(RuntimeState* state) {
 Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
-
     // check if Canceled.
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_row_batches_lock);
@@ -813,6 +813,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) 
{
             }
             OlapScanner* scanner = new OlapScanner(state, this, 
_olap_scan_node.is_preaggregation,
                                                    _need_agg_finalize, 
*scan_range);
+            scanner->set_batch_size(_batch_size);
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare 
failed.
             _scanner_pool.add(scanner);
@@ -1359,7 +1360,12 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
         }
     }
 
-    ThreadPoolToken* thread_token = 
state->get_query_fragments_ctx()->get_token();
+    ThreadPoolToken* thread_token = nullptr;
+    if (limit() != -1 && limit() < 1024) {
+        thread_token = state->get_query_fragments_ctx()->get_serial_token();
+    } else {
+        thread_token = state->get_query_fragments_ctx()->get_token();
+    }
 
     /*********************************
      * The basic strategy of priority scheduling:
@@ -1605,7 +1611,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
                        << ", fragment id=" << 
print_id(_runtime_state->fragment_instance_id());
             break;
         }
-        RowBatch* row_batch = new RowBatch(this->row_desc(), 
state->batch_size(),
+        RowBatch* row_batch = new RowBatch(this->row_desc(), _batch_size,
                                            
_runtime_state->fragment_mem_tracker().get());
         row_batch->set_scanner_id(scanner->id());
         status = scanner->get_batch(_runtime_state, row_batch, &eos);
@@ -1625,6 +1631,10 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
             raw_bytes_read += 
row_batch->tuple_data_pool()->total_reserved_bytes();
         }
         raw_rows_read = scanner->raw_rows_read();
+        if (limit() != -1 && raw_rows_read >= limit()) {
+            eos = true;
+            break;
+        }
     }
 
     {
@@ -1643,6 +1653,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
             std::lock_guard<SpinLock> guard(_status_mutex);
             global_status_ok = _status.ok();
         }
+
         if (UNLIKELY(!global_status_ok)) {
             eos = true;
             for (auto rb : row_batchs) {
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 4cd2902b93..12cdb36f85 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -199,6 +199,8 @@ protected:
     // object is.
     ObjectPool _scanner_pool;
 
+    size_t _batch_size = 0;
+
     std::shared_ptr<std::thread> _transfer_thread;
 
     // Keeps track of total splits and the number finished.
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index bbf3cb9784..a5f29970ea 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -60,8 +60,7 @@ Status OlapScanner::prepare(
                 bloom_filters) {
     set_tablet_reader();
     // set limit to reduce end of rowset and segment mem use
-    _tablet_reader->set_batch_size(_parent->limit() == -1 ? 
_parent->_runtime_state->batch_size() : std::min(
-            static_cast<int64_t>(_parent->_runtime_state->batch_size()), 
_parent->limit()));
+    _tablet_reader->set_batch_size(_parent->_batch_size);
 
     // Get olap table
     TTabletId tablet_id = scan_range.tablet_id;
@@ -269,9 +268,12 @@ Status OlapScanner::_init_return_columns(bool 
need_seq_col) {
 
 Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) 
{
     // 2. Allocate Row's Tuple buf
-    uint8_t* tuple_buf =
-            batch->tuple_data_pool()->allocate(state->batch_size() * 
_tuple_desc->byte_size());
-    bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size());
+    uint8_t* tuple_buf = batch->tuple_data_pool()->allocate(_batch_size * 
_tuple_desc->byte_size());
+    if (tuple_buf == nullptr) {
+        LOG(WARNING) << "Allocate mem for row batch failed.";
+        return Status::RuntimeError("Allocate mem for row batch failed.");
+    }
+    bzero(tuple_buf, _batch_size * _tuple_desc->byte_size());
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf);
 
     std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
@@ -284,6 +286,11 @@ Status OlapScanner::get_batch(RuntimeState* state, 
RowBatch* batch, bool* eof) {
         ObjectPool tmp_object_pool;
         // release the memory of the object which can't pass the conjuncts.
         ObjectPool unused_object_pool;
+        if (batch->tuple_data_pool()->total_reserved_bytes() >= 
raw_bytes_threshold) {
+            return Status::RuntimeError(
+                    "Scanner row bytes buffer is too small, please try to 
increase be config "
+                    "'doris_scanner_row_bytes'.");
+        }
         while (true) {
             // Batch is full or reach raw_rows_threshold or 
raw_bytes_threshold, break
             if (batch->is_full() ||
@@ -608,6 +615,7 @@ void OlapScanner::_update_realtime_counter() {
     COUNTER_UPDATE(_parent->_raw_rows_counter, stats.raw_rows_read);
     // if raw_rows_read is reset, scanNode will scan all table rows which may 
cause BE crash
     _raw_rows_read += stats.raw_rows_read;
+
     _tablet_reader->mutable_stats()->raw_rows_read = 0;
 }
 
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index fa0d7e4578..6be9c5c1aa 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -92,6 +92,8 @@ public:
 
     const std::vector<SlotDescriptor*>& get_query_slots() const { return 
_query_slots; }
 
+    void set_batch_size(size_t batch_size) { _batch_size = batch_size; }
+
 protected:
     Status _init_tablet_reader_params(
             const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
@@ -141,6 +143,8 @@ protected:
     int64_t _raw_rows_read = 0;
     int64_t _compressed_bytes_read = 0;
 
+    size_t _batch_size = 0;
+
     // number rows filtered by pushed condition
     int64_t _num_rows_pushed_cond_filtered = 0;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e5d6a7176e..bad2c50c55 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -620,6 +620,15 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
                 
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
             }
         }
+        if (params.__isset.fragment && params.fragment.__isset.plan &&
+            params.fragment.plan.nodes.size() > 0) {
+            for (auto& node : params.fragment.plan.nodes) {
+                if (node.limit > 0 && node.limit < 1024) {
+                    fragments_ctx->set_serial_thread_token();
+                    break;
+                }
+            }
+        }
 
         {
             // Find _fragments_ctx_map again, in case some other request has 
already
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 73f6b415b9..1e7b3874c6 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -61,11 +61,17 @@ public:
                     cpu_limit);
         }
     }
+    void set_serial_thread_token() {
+        _serial_thread_token = 
_exec_env->limited_scan_thread_pool()->new_token(
+                ThreadPool::ExecutionMode::SERIAL, 1);
+    }
 
     ThreadPoolToken* get_token() {
         return _thread_token.get();
     }
 
+    ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); }
+
     void set_ready_to_execute() {
         {
             std::lock_guard<std::mutex> l(_start_lock);
@@ -111,6 +117,11 @@ private:
     // If this token is not set, the scanner will be executed in 
"_scan_thread_pool" in exec env.
     std::unique_ptr<ThreadPoolToken> _thread_token;
 
+    // A token used to submit olap scanner to the "_limited_scan_thread_pool" 
serially, it used for
+    // query like `select * limit 1`, this query used for limit the max scaner 
thread to 1 to avoid
+    // this query cost too much resource
+    std::unique_ptr<ThreadPoolToken> _serial_thread_token;
+
     std::mutex _start_lock;
     std::condition_variable _start_cond;
     // Only valid when _need_wait_execution_trigger is set to true in 
FragmentExecState.
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 02d70b16e1..29ca5b2360 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -504,7 +504,7 @@ size_t RowBatch::get_batch_size(const PRowBatch& batch) {
 void RowBatch::acquire_state(RowBatch* src) {
     // DCHECK(_row_desc.equals(src->_row_desc));
     DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row);
-    DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
+    // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
     DCHECK_EQ(_auxiliary_mem_usage, 0);
 
     // The destination row batch should be empty.
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 4ea1c3f1f3..1ec25dce91 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -633,21 +633,45 @@ int 
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
     }
 
     // post volap scanners to thread-pool
-    PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
+    ThreadPoolToken* thread_token = nullptr;
+    if (_limit > -1 && _limit < 1024) {
+        thread_token = state->get_query_fragments_ctx()->get_serial_token();
+    } else {
+        thread_token = state->get_query_fragments_ctx()->get_token();
+    }
     auto iter = olap_scanners.begin();
-    while (iter != olap_scanners.end()) {
-        PriorityThreadPool::Task task;
-        task.work_function = std::bind(&VOlapScanNode::scanner_thread, this, 
*iter);
-        task.priority = _nice;
-        task.queue_id = 
state->exec_env()->store_path_to_index((*iter)->scan_disk());
-        (*iter)->start_wait_worker_timer();
-        COUNTER_UPDATE(_scanner_sched_counter, 1);
-        if (thread_pool->offer(task)) {
-            olap_scanners.erase(iter++);
-        } else {
-            LOG(FATAL) << "Failed to assign scanner task to thread pool!";
+    if (thread_token != nullptr) {
+        while (iter != olap_scanners.end()) {
+            auto s = thread_token->submit_func([this, scanner = *iter] {
+                this->scanner_thread(scanner);
+            });
+            if (s.ok()) {
+                (*iter)->start_wait_worker_timer();
+                COUNTER_UPDATE(_scanner_sched_counter, 1);
+                olap_scanners.erase(iter++);
+            } else {
+                LOG(FATAL) << "Failed to assign scanner task to thread pool! " 
<< s.get_error_msg();
+            }
+            ++_total_assign_num;
+        }
+    } else {
+        PriorityThreadPool* thread_pool = 
state->exec_env()->scan_thread_pool();
+        while (iter != olap_scanners.end()) {
+            PriorityThreadPool::Task task;
+            task.work_function = [this, scanner = *iter] {
+                this->scanner_thread(scanner);
+            };
+            task.priority = _nice;
+            task.queue_id = 
state->exec_env()->store_path_to_index((*iter)->scan_disk());
+            (*iter)->start_wait_worker_timer();
+            COUNTER_UPDATE(_scanner_sched_counter, 1);
+            if (thread_pool->offer(task)) {
+                olap_scanners.erase(iter++);
+            } else {
+                LOG(FATAL) << "Failed to assign scanner task to thread pool!";
+            }
+            ++_total_assign_num;
         }
-        ++_total_assign_num;
     }
 
     return assigned_thread_num;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to