This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7248420cfd1 [chore](session_variable) Add 'data_queue_max_blocks' to 
prevent the DataQueue from occupying too much memory. (#34017) (#34395)
7248420cfd1 is described below

commit 7248420cfd1679209651fd0e205a7dacd0e4ca5f
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Sun May 5 21:20:33 2024 +0800

    [chore](session_variable) Add 'data_queue_max_blocks' to prevent the 
DataQueue from occupying too much memory. (#34017) (#34395)
---
 be/src/pipeline/exec/data_queue.cpp                | 10 ++++++++-
 be/src/pipeline/exec/data_queue.h                  |  4 ++++
 be/src/pipeline/exec/union_sink_operator.cpp       |  1 +
 be/src/pipeline/pipeline_fragment_context.cpp      |  5 +++++
 be/src/runtime/runtime_state.h                     |  5 +++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 10 +++++++++
 gensrc/thrift/PaloInternalService.thrift           |  3 +++
 .../query_p0/aggregate/distinct_streaming_agg.out  |  6 +++++
 .../aggregate/distinct_streaming_agg.groovy        | 26 ++++++++++++++++++++++
 9 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index 06c16c7dfa6..e251ace57a4 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -119,10 +119,13 @@ Status 
DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
             }
             _cur_bytes_in_queue[_flag_queue_idx] -= 
(*output_block)->allocated_bytes();
             _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
+            if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0 &&
+                _sink_dependencies[_flag_queue_idx] != nullptr) {
+                _sink_dependencies[_flag_queue_idx]->set_ready();
+            }
             auto old_value = _cur_blocks_total_nums.fetch_sub(1);
             if (old_value == 1 && _source_dependency) {
                 set_source_block();
-                _sink_dependencies[_flag_queue_idx]->set_ready();
             }
         } else {
             if (_is_finished[_flag_queue_idx]) {
@@ -142,6 +145,11 @@ void 
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
         _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
         _queue_blocks[child_idx].emplace_back(std::move(block));
         _cur_blocks_nums_in_queue[child_idx] += 1;
+
+        if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue &&
+            _sink_dependencies[child_idx] != nullptr) {
+            _sink_dependencies[child_idx]->block();
+        }
         _cur_blocks_total_nums++;
         if (_source_dependency) {
             set_source_ready();
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index 1a50b7485d1..f5bd84cc278 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -70,6 +70,8 @@ public:
     void set_source_ready();
     void set_source_block();
 
+    void set_max_blocks_in_sub_queue(int64_t max_blocks) { 
_max_blocks_in_sub_queue = max_blocks; }
+
 private:
     std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
     std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
@@ -93,6 +95,8 @@ private:
     // only used by streaming agg source operator
     bool _data_exhausted = false;
 
+    int64_t _max_blocks_in_sub_queue = 1;
+
     //this only use to record the queue[0] for profile
     int64_t _max_bytes_in_queue = 0;
     int64_t _max_size_of_queue = 0;
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index e466237a375..40344882a84 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -111,6 +111,7 @@ Status UnionSinkLocalState::open(RuntimeState* state) {
     for (size_t i = 0; i < p._child_expr.size(); i++) {
         RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
     }
+    
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index e53492c9fa0..e3f9de5fd03 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -542,6 +542,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         } else {
             int child_count = union_node->children_count();
             auto data_queue = std::make_shared<DataQueue>(child_count);
+            
data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks());
             for (int child_id = 0; child_id < child_count; ++child_id) {
                 auto new_child_pipeline = add_pipeline();
                 RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), 
new_child_pipeline));
@@ -564,8 +565,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
                                          std::to_string(agg_node->id()) +
                                          ": group by and output is empty");
         }
+
+        const int64_t data_queue_max_blocks = 
_runtime_state->data_queue_max_blocks();
         if (agg_node->is_aggregate_evaluators_empty() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
+            data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
             OperatorBuilderPtr pre_agg_sink =
                     
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
                                                                               
data_queue);
@@ -577,6 +581,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
             RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
         } else if (agg_node->is_streaming_preagg() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
+            data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
             RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e01eb6166dc..e4bec15bf63 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -431,6 +431,11 @@ public:
         return _query_options.__isset.skip_missing_version && 
_query_options.skip_missing_version;
     }
 
+    int64_t data_queue_max_blocks() const {
+        return _query_options.__isset.data_queue_max_blocks ? 
_query_options.data_queue_max_blocks
+                                                            : 1;
+    }
+
     bool enable_page_cache() const;
 
     int partitioned_hash_join_rows_threshold() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d89ff2ea8df..7ad72283d44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -493,6 +493,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
     public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
     public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
+    public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
 
     public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
 
@@ -1745,6 +1746,13 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true, fuzzy = true)
     public boolean enableAggSpill = false;
 
+    @VariableMgr.VarAttr(
+            name = DATA_QUEUE_MAX_BLOCKS,
+            description = {"DataQueue 中每个子队列允许最大的 block 个数",
+                    "Max blocks in DataQueue."},
+            needForward = true, fuzzy = true)
+    public long dataQueueMaxBlocks = 1;
+
     // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
     // Set to 0 to disable; min: 128M
     public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -3171,6 +3179,8 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableSortSpill(enableSortSpill);
         tResult.setEnableAggSpill(enableAggSpill);
         tResult.setMinRevocableMem(minRevocableMem);
+        tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 3265cb47b26..2f7886e10c2 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -285,6 +285,9 @@ struct TQueryOptions {
   104: optional i64 min_revocable_mem = 0
 
   105: optional i64 spill_streaming_agg_mem_limit = 0;
+
+  // max rows of each sub-queue in DataQueue.
+  106: optional i64 data_queue_max_blocks = 0;
   
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false
diff --git a/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out 
b/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out
new file mode 100644
index 00000000000..d2ac59bdede
--- /dev/null
+++ b/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+true
+false
+\N
+
diff --git 
a/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy 
b/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy
new file mode 100644
index 00000000000..bd3285a60f9
--- /dev/null
+++ b/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+suite("distinct_streaming_agg") {
+    sql """ use test_query_db; """
+
+    qt_select """
+        select k6 from baseall union select k6 from bigtable order by 1;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to