This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b3b13d27993e3906525c96e888607e26ca2e8048
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Apr 24 11:10:04 2024 +0800

    [chore](session_variable) Add 'data_queue_max_blocks' to prevent the 
DataQueue from occupying too much memory. (#34017)
---
 be/src/pipeline/exec/data_queue.cpp                              | 8 +++++++-
 be/src/pipeline/exec/data_queue.h                                | 4 ++++
 be/src/pipeline/exec/union_sink_operator.cpp                     | 1 +
 be/src/pipeline/pipeline_fragment_context.cpp                    | 5 +++++
 be/src/runtime/runtime_state.h                                   | 5 +++++
 .../src/main/java/org/apache/doris/qe/SessionVariable.java       | 9 +++++++++
 gensrc/thrift/PaloInternalService.thrift                         | 3 +++
 7 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/data_queue.cpp 
b/be/src/pipeline/exec/data_queue.cpp
index 06c16c7dfa6..d248edd9081 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -119,10 +119,12 @@ Status 
DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
             }
             _cur_bytes_in_queue[_flag_queue_idx] -= 
(*output_block)->allocated_bytes();
             _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
+            if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
+                _sink_dependencies[_flag_queue_idx]->set_ready();
+            }
             auto old_value = _cur_blocks_total_nums.fetch_sub(1);
             if (old_value == 1 && _source_dependency) {
                 set_source_block();
-                _sink_dependencies[_flag_queue_idx]->set_ready();
             }
         } else {
             if (_is_finished[_flag_queue_idx]) {
@@ -142,6 +144,10 @@ void 
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
         _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
         _queue_blocks[child_idx].emplace_back(std::move(block));
         _cur_blocks_nums_in_queue[child_idx] += 1;
+
+        if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
+            _sink_dependencies[child_idx]->block();
+        }
         _cur_blocks_total_nums++;
         if (_source_dependency) {
             set_source_ready();
diff --git a/be/src/pipeline/exec/data_queue.h 
b/be/src/pipeline/exec/data_queue.h
index 1a50b7485d1..f5bd84cc278 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -70,6 +70,8 @@ public:
     void set_source_ready();
     void set_source_block();
 
+    void set_max_blocks_in_sub_queue(int64_t max_blocks) { 
_max_blocks_in_sub_queue = max_blocks; }
+
 private:
     std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
     std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
@@ -93,6 +95,8 @@ private:
     // only used by streaming agg source operator
     bool _data_exhausted = false;
 
+    int64_t _max_blocks_in_sub_queue = 1;
+
     //this only use to record the queue[0] for profile
     int64_t _max_bytes_in_queue = 0;
     int64_t _max_size_of_queue = 0;
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index ce1195f042b..41548e4a551 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -103,6 +103,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     for (size_t i = 0; i < p._child_expr.size(); i++) {
         RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
     }
+    
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
     return Status::OK();
 };
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d3a5c514c97..5292bc65ce9 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -543,6 +543,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         } else {
             int child_count = union_node->children_count();
             auto data_queue = std::make_shared<DataQueue>(child_count);
+            
data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks());
             for (int child_id = 0; child_id < child_count; ++child_id) {
                 auto new_child_pipeline = add_pipeline();
                 RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), 
new_child_pipeline));
@@ -565,8 +566,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
                                          std::to_string(agg_node->id()) +
                                          ": group by and output is empty");
         }
+
+        const int64_t data_queue_max_blocks = 
_runtime_state->data_queue_max_blocks();
         if (agg_node->is_aggregate_evaluators_empty() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
+            data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
             OperatorBuilderPtr pre_agg_sink =
                     
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
                                                                               
data_queue);
@@ -578,6 +582,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
             RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
         } else if (agg_node->is_streaming_preagg() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
+            data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
             RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 07655c71b6c..c2e52e440ec 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -430,6 +430,11 @@ public:
         return _query_options.__isset.skip_missing_version && 
_query_options.skip_missing_version;
     }
 
+    bool data_queue_max_blocks() const {
+        return _query_options.__isset.data_queue_max_blocks ? 
_query_options.data_queue_max_blocks
+                                                            : 1;
+    }
+
     bool enable_page_cache() const;
 
     int partitioned_hash_join_rows_threshold() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 83cc8560e6e..1c349d29571 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -500,6 +500,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
     public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
     public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
+    public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
 
     public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
 
@@ -1732,6 +1733,13 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true, fuzzy = true)
     public boolean enableAggSpill = false;
 
+    @VariableMgr.VarAttr(
+            name = DATA_QUEUE_MAX_BLOCKS,
+            description = {"DataQueue 中每个子队列允许最大的 block 个数",
+                    "Max blocks in DataQueue."},
+            needForward = true, fuzzy = true)
+    public long dataQueueMaxBlocks = 1;
+
     // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
     // Set to 0 to disable; min: 128M
     public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -3184,6 +3192,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableSortSpill(enableSortSpill);
         tResult.setEnableAggSpill(enableAggSpill);
         tResult.setMinRevocableMem(minRevocableMem);
+        tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 
         return tResult;
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b71ddfa21a3..24d56ef8df0 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -285,6 +285,9 @@ struct TQueryOptions {
   104: optional i64 min_revocable_mem = 0
 
   105: optional i64 spill_streaming_agg_mem_limit = 0;
+
+  // max rows of each sub-queue in DataQueue.
+  106: optional i64 data_queue_max_blocks = 0;
   
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to