This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit b6b94cb576b712e74c28ee8e43fadf3dd062d12e Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Fri Dec 13 18:22:47 2024 +0800 test --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/pipeline/exec/hashjoin_build_sink.cpp | 8 ++++++ be/src/pipeline/exec/hashjoin_build_sink.h | 2 ++ .../exec/partitioned_hash_join_sink_operator.cpp | 33 ++++++++++++++-------- be/src/pipeline/pipeline_task.cpp | 32 ++++++++++++++++++--- 6 files changed, 63 insertions(+), 16 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1c66d88dfe5..89fcea3372d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1396,6 +1396,8 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false"); DEFINE_Bool(enable_table_size_correctness_check, "false"); DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false"); +DEFINE_mInt32(revocable_memory_bytes_high_watermark, "5"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 29e55e64063..8e308c1794d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1482,6 +1482,8 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction); // Enable validation to check the correctness of table size. DECLARE_Bool(enable_table_size_correctness_check); +DECLARE_mInt32(revocable_memory_bytes_high_watermark); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 19ca5292c2f..a8122dd11ed 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -682,4 +682,12 @@ size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const { return local_state._memory_used_counter->value(); } +std::string HashJoinBuildSinkOperatorX::get_memory_usage_debug_str(RuntimeState* state) const { + auto& local_state = get_local_state(state); + return fmt::format("build block: {}, hash table: {}, build key arena: {}", + PrettyPrinter::print_bytes(local_state._build_blocks_memory_usage->value()), + PrettyPrinter::print_bytes(local_state._hash_table_memory_usage->value()), + PrettyPrinter::print_bytes(local_state._build_arena_memory_usage->value())); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 1e1405e149c..074ff33abb2 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -126,6 +126,8 @@ public: [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const; + std::string get_memory_usage_debug_str(RuntimeState* state) const; + bool should_dry_run(RuntimeState* state) override { return _is_broadcast_join && !state->get_sink_local_state() ->cast<HashJoinBuildSinkLocalState>() diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 09e13eb465d..fe78489cb98 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -582,14 +582,20 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._child_eos = eos; + Defer defer_dgb {[&]() { + if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) { + VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() + << " sink " << node_id() << " _child_eos: " << local_state._child_eos + << ", revocable memory: " << local_state.revocable_mem_size(state); + } + }}; const auto rows = in_block->rows(); const auto need_to_spill = local_state._shared_state->need_to_spill; if (rows == 0) { if (eos) { VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() - << " sink " << node_id() << " eos, set_ready_to_read" - << ", need spill: " << need_to_spill; + << " sink " << node_id() << " eos, need spill: " << need_to_spill; if (need_to_spill) { return revoke_memory(state, nullptr); @@ -605,11 +611,13 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B Defer defer {[&]() { local_state.update_memory_usage(); }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " task " - << state->task_id() << " sink " << node_id() << " eos, set_ready_to_read" - << ", nonspill build usage: " - << PrettyPrinter::print_bytes(_inner_sink_operator->get_memory_usage( - local_state._shared_state->inner_runtime_state.get())); + + VLOG_DEBUG << fmt::format( + "Query: {}, task {}, sink {} eos, set_ready_to_read, nonspill memory " + "usage: {}", + print_id(state->query_id()), state->task_id(), node_id(), + _inner_sink_operator->get_memory_usage_debug_str( + local_state._shared_state->inner_runtime_state.get())); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), @@ -646,11 +654,12 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); if (eos) { - VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " task " << state->task_id() - << " sink " << node_id() << " eos, set_ready_to_read" - << ", nonspill build usage: " - << _inner_sink_operator->get_memory_usage( - local_state._shared_state->inner_runtime_state.get()); + VLOG_DEBUG << fmt::format( + "Query: {}, task {}, sink {} eos, set_ready_to_read, nonspill memory " + "usage: {}", + print_id(state->query_id()), state->task_id(), node_id(), + _inner_sink_operator->get_memory_usage_debug_str( + local_state._shared_state->inner_runtime_state.get())); local_state._dependency->set_ready_to_read(); } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index a8b7dd06b27..6bf4cb39fd1 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -42,6 +42,7 @@ #include "util/container_util.hpp" #include "util/defer_op.h" #include "util/mem_info.h" +#include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "util/uid_util.h" #include "vec/core/block.h" @@ -492,10 +493,33 @@ Status PipelineTask::execute(bool* eos) { // it will not be able to spill later even if memory is low, and will cause cancel of queries. // So make a check here, if it's low watermark after reserve and if reserved memory is too many, // then trigger revoke memory. - if (is_low_watermark && - sink_reserve_size >= workload_group->memory_limit() * 0.05) { - RETURN_IF_ERROR(_sink->revoke_memory(_state, nullptr)); - continue; + + // debug + if (sink_reserve_size > 64 * 1024 * 1024) { + LOG(INFO) << fmt::format( + "Query: {}, sink name: {}, node id: {}, task id: {}, " + "is_low_watermark: {}, sink_reserve_size: {}, wg mem limit: {}, " + "reserve/wg_limit: {}", + print_id(query_id), _sink->get_name(), _sink->node_id(), + _state->task_id(), is_low_watermark, + PrettyPrinter::print_bytes(sink_reserve_size), + PrettyPrinter::print_bytes(workload_group->memory_limit()), + ((double)sink_reserve_size) / workload_group->memory_limit()); + } + if (is_low_watermark) { + const auto revocable_size = _sink->revocable_mem_size(_state); + if (revocable_size >= config::revocable_memory_bytes_high_watermark) { + LOG(INFO) << fmt::format( + "Query: {}, sink name: {}, node id: {}, task id: {}, " + "sink_reserve_size: {}, revoke_memory " + "because revocable memory is high: {}", + print_id(query_id), _sink->get_name(), _sink->node_id(), + _state->task_id(), + PrettyPrinter::print_bytes(sink_reserve_size), + PrettyPrinter::print_bytes(revocable_size)); + RETURN_IF_ERROR(_sink->revoke_memory(_state, nullptr)); + continue; + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org