This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 7248420cfd1 [chore](session_variable) Add 'data_queue_max_blocks' to prevent the DataQueue from occupying too much memory. (#34017) (#34395) 7248420cfd1 is described below commit 7248420cfd1679209651fd0e205a7dacd0e4ca5f Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Sun May 5 21:20:33 2024 +0800 [chore](session_variable) Add 'data_queue_max_blocks' to prevent the DataQueue from occupying too much memory. (#34017) (#34395) --- be/src/pipeline/exec/data_queue.cpp | 10 ++++++++- be/src/pipeline/exec/data_queue.h | 4 ++++ be/src/pipeline/exec/union_sink_operator.cpp | 1 + be/src/pipeline/pipeline_fragment_context.cpp | 5 +++++ be/src/runtime/runtime_state.h | 5 +++++ .../java/org/apache/doris/qe/SessionVariable.java | 10 +++++++++ gensrc/thrift/PaloInternalService.thrift | 3 +++ .../query_p0/aggregate/distinct_streaming_agg.out | 6 +++++ .../aggregate/distinct_streaming_agg.groovy | 26 ++++++++++++++++++++++ 9 files changed, 69 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 06c16c7dfa6..e251ace57a4 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -119,10 +119,13 @@ Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu } _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes(); _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; + if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0 && + _sink_dependencies[_flag_queue_idx] != nullptr) { + _sink_dependencies[_flag_queue_idx]->set_ready(); + } auto old_value = _cur_blocks_total_nums.fetch_sub(1); if (old_value == 1 && _source_dependency) { set_source_block(); - _sink_dependencies[_flag_queue_idx]->set_ready(); } } else { if (_is_finished[_flag_queue_idx]) { @@ -142,6 +145,11 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i _cur_bytes_in_queue[child_idx] += block->allocated_bytes(); _queue_blocks[child_idx].emplace_back(std::move(block)); _cur_blocks_nums_in_queue[child_idx] += 1; + + if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue && + _sink_dependencies[child_idx] != nullptr) { + _sink_dependencies[child_idx]->block(); + } _cur_blocks_total_nums++; if (_source_dependency) { set_source_ready(); diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index 1a50b7485d1..f5bd84cc278 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -70,6 +70,8 @@ public: void set_source_ready(); void set_source_block(); + void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; } + private: std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock; std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks; @@ -93,6 +95,8 @@ private: // only used by streaming agg source operator bool _data_exhausted = false; + int64_t _max_blocks_in_sub_queue = 1; + //this only use to record the queue[0] for profile int64_t _max_bytes_in_queue = 0; int64_t _max_size_of_queue = 0; diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index e466237a375..40344882a84 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -111,6 +111,7 @@ Status UnionSinkLocalState::open(RuntimeState* state) { for (size_t i = 0; i < p._child_expr.size(); i++) { RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); } + _shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks()); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e53492c9fa0..e3f9de5fd03 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -542,6 +542,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur } else { int child_count = union_node->children_count(); auto data_queue = std::make_shared<DataQueue>(child_count); + data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks()); for (int child_id = 0; child_id < child_count; ++child_id) { auto new_child_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline)); @@ -564,8 +565,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur std::to_string(agg_node->id()) + ": group by and output is empty"); } + + const int64_t data_queue_max_blocks = _runtime_state->data_queue_max_blocks(); if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) { auto data_queue = std::make_shared<DataQueue>(1); + data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks); OperatorBuilderPtr pre_agg_sink = std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node, data_queue); @@ -577,6 +581,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source)); } else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) { auto data_queue = std::make_shared<DataQueue>(1); + data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks); OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>( node->id(), agg_node, data_queue); RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink)); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e01eb6166dc..e4bec15bf63 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -431,6 +431,11 @@ public: return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version; } + int64_t data_queue_max_blocks() const { + return _query_options.__isset.data_queue_max_blocks ? _query_options.data_queue_max_blocks + : 1; + } + bool enable_page_cache() const; int partitioned_hash_join_rows_threshold() const { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d89ff2ea8df..7ad72283d44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -493,6 +493,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_JOIN_SPILL = "enable_join_spill"; public static final String ENABLE_SORT_SPILL = "enable_sort_spill"; public static final String ENABLE_AGG_SPILL = "enable_agg_spill"; + public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; @@ -1745,6 +1746,13 @@ public class SessionVariable implements Serializable, Writable { needForward = true, fuzzy = true) public boolean enableAggSpill = false; + @VariableMgr.VarAttr( + name = DATA_QUEUE_MAX_BLOCKS, + description = {"DataQueue 中每个子队列允许最大的 block 个数", + "Max blocks in DataQueue."}, + needForward = true, fuzzy = true) + public long dataQueueMaxBlocks = 1; + // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @@ -3171,6 +3179,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableSortSpill(enableSortSpill); tResult.setEnableAggSpill(enableAggSpill); tResult.setMinRevocableMem(minRevocableMem); + tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 3265cb47b26..2f7886e10c2 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -285,6 +285,9 @@ struct TQueryOptions { 104: optional i64 min_revocable_mem = 0 105: optional i64 spill_streaming_agg_mem_limit = 0; + + // max rows of each sub-queue in DataQueue. + 106: optional i64 data_queue_max_blocks = 0; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false diff --git a/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out b/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out new file mode 100644 index 00000000000..d2ac59bdede --- /dev/null +++ b/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +true +false +\N + diff --git a/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy b/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy new file mode 100644 index 00000000000..bd3285a60f9 --- /dev/null +++ b/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +suite("distinct_streaming_agg") { + sql """ use test_query_db; """ + + qt_select """ + select k6 from baseall union select k6 from bigtable order by 1; + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org