This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 92ee09e0fd5 fix 92ee09e0fd5 is described below commit 92ee09e0fd5f97ae1f8b757bf7bb6176ef71e6b0 Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Mon Dec 16 16:02:03 2024 +0800 fix --- .../exec/partitioned_hash_join_sink_operator.cpp | 34 ++++++++++++++++---- be/src/pipeline/pipeline_task.cpp | 37 ---------------------- be/src/runtime/runtime_state.h | 7 ++++ .../java/org/apache/doris/qe/SessionVariable.java | 5 +++ gensrc/thrift/PaloInternalService.thrift | 1 + 5 files changed, 41 insertions(+), 43 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 672eb36a907..dad82b6cc8a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -260,7 +260,22 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( } } - return Status::OK(); + Status status; + if (_child_eos) { + std::for_each(_shared_state->partitioned_build_blocks.begin(), + _shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); + } + }); + status = _finish_spilling(); + VLOG_DEBUG << fmt::format( + "Query: {}, task {}, sink {} _revoke_unpartitioned_block set_ready_to_read", + print_id(state->query_id()), state->task_id(), _parent->node_id()); + _dependency->set_ready_to_read(); + } + + return status; }; auto exception_catch_func = [spill_func]() mutable { @@ -547,12 +562,19 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B return revoke_memory(state, nullptr); } else { const auto revocable_size = revocable_mem_size(state); - if (revocable_size >= config::revocable_memory_bytes_high_watermark) { - LOG(INFO) << fmt::format( - "Query: {}, sink name: {}, node id: {}, task id: {}, " - "revoke_memory " + // TODO: consider parallel? + // After building hash table it will not be able to spill later + // even if memory is low, and will cause cancel of queries. + // So make a check here, if build blocks mem usage is too high, + // then trigger revoke memory. + auto query_mem_limit = state->get_query_ctx()->mem_limit(); + if (revocable_size >= (double)query_mem_limit / 100.0 * + state->revocable_memory_high_watermark_percent()) { + VLOG_DEBUG << fmt::format( + "Query: {}, task {}, sink {}, query mem limit: {}, revoke_memory " "because revocable memory is high: {}", - print_id(state->query_id()), get_name(), node_id(), state->task_id(), + print_id(state->query_id()), state->task_id(), node_id(), + PrettyPrinter::print_bytes(query_mem_limit), PrettyPrinter::print_bytes(revocable_size)); return revoke_memory(state, nullptr); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index cd822ef15e2..d4ed0790942 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -494,43 +494,6 @@ Status PipelineTask::execute(bool* eos) { *eos = false; continue; } - if (workload_group) { - bool is_low_watermark = false; - bool is_high_watermark = false; - workload_group->check_mem_used(&is_low_watermark, &is_high_watermark); - // for hash join build sink, if it's eos at this reserve, it will build hash table and - // it will not be able to spill later even if memory is low, and will cause cancel of queries. - // So make a check here, if it's low watermark after reserve and if reserved memory is too many, - // then trigger revoke memory. - - // debug - if (sink_reserve_size > 64 * 1024 * 1024) { - LOG(INFO) << fmt::format( - "Query: {}, sink name: {}, node id: {}, task id: {}, " - "is_low_watermark: {}, sink_reserve_size: {}, wg mem limit: {}, " - "reserve/wg_limit: {}", - print_id(query_id), _sink->get_name(), _sink->node_id(), - _state->task_id(), is_low_watermark, - PrettyPrinter::print_bytes(sink_reserve_size), - PrettyPrinter::print_bytes(workload_group->memory_limit()), - ((double)sink_reserve_size) / workload_group->memory_limit()); - } - if (is_low_watermark) { - const auto revocable_size = _sink->revocable_mem_size(_state); - if (revocable_size >= config::revocable_memory_bytes_high_watermark) { - LOG(INFO) << fmt::format( - "Query: {}, sink name: {}, node id: {}, task id: {}, " - "sink_reserve_size: {}, revoke_memory " - "because revocable memory is high: {}", - print_id(query_id), _sink->get_name(), _sink->node_id(), - _state->task_id(), - PrettyPrinter::print_bytes(sink_reserve_size), - PrettyPrinter::print_bytes(revocable_size)); - RETURN_IF_ERROR(_sink->revoke_memory(_state, nullptr)); - continue; - } - } - } } // Define a lambda function to catch sink exception, because sink will check diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 4da3384cb60..7318c93f15a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -597,6 +597,13 @@ public: return 1; } + int revocable_memory_high_watermark_percent() const { + if (_query_options.__isset.revocable_memory_high_watermark_percent) { + return _query_options.revocable_memory_high_watermark_percent; + } + return 10; + } + size_t minimum_operator_memory_required_bytes() const { if (_query_options.__isset.minimum_operator_memory_required_kb) { return _query_options.minimum_operator_memory_required_kb * 1024; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index af379bc4135..16b214b1536 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -566,6 +566,7 @@ public class SessionVariable implements Serializable, Writable { public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit"; public static final String MIN_REVOCABLE_MEM = "min_revocable_mem"; public static final String ENABLE_SPILL = "enable_spill"; + public static final String REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = "revocable_memory_high_watermark_percent"; public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory"; public static final String ENABLE_FORCE_SPILL = "enable_force_spill"; public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; @@ -2243,6 +2244,9 @@ public class SessionVariable implements Serializable, Writable { needForward = true, fuzzy = true) public long dataQueueMaxBlocks = 1; + @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy = true) + public int revocableMemoryHighWatermarkPercent = 10; + // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @@ -3957,6 +3961,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableForceSpill(enableForceSpill); tResult.setExternalAggPartitionBits(externalAggPartitionBits); tResult.setMinRevocableMem(minRevocableMem); + tResult.setRevocableMemoryHighWatermarkPercent(revocableMemoryHighWatermarkPercent); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); tResult.setEnableLocalMergeSort(enableLocalMergeSort); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d3a2e4521f9..7ebe16583d8 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -367,6 +367,7 @@ struct TQueryOptions { 144: optional i32 query_slot_count = 0; 145: optional bool enable_spill = false 146: optional bool enable_reserve_memory = true + 147: optional i32 revocable_memory_high_watermark_percent = 10 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org