This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new ed3ca41faa3 Opt the spill logic (#41415) ed3ca41faa3 is described below commit ed3ca41faa300562002729006845877f014fad5d Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Sep 27 19:37:16 2024 +0800 Opt the spill logic (#41415) 1. Add some counters in join/agg 2. Optimize the log, add some debug infomation 3. Revoke memory from non-sink operator(join probe) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 65 ++++++++++++---------- be/src/pipeline/exec/aggregation_sink_operator.h | 3 + .../pipeline/exec/aggregation_source_operator.cpp | 13 ++++- be/src/pipeline/exec/aggregation_source_operator.h | 2 + be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +- .../exec/partitioned_aggregation_sink_operator.cpp | 6 ++ .../exec/partitioned_aggregation_sink_operator.h | 2 + .../partitioned_aggregation_source_operator.cpp | 8 ++- .../exec/partitioned_aggregation_source_operator.h | 1 + .../exec/partitioned_hash_join_probe_operator.cpp | 41 ++++++++++++-- .../exec/partitioned_hash_join_probe_operator.h | 7 ++- .../exec/partitioned_hash_join_sink_operator.cpp | 7 ++- be/src/pipeline/exec/spill_utils.h | 24 ++++++-- .../exec/streaming_aggregation_operator.cpp | 4 +- be/src/pipeline/pipeline_task.cpp | 15 ++--- be/src/runtime/query_context.cpp | 15 +++-- be/src/runtime/query_context.h | 6 +- .../workload_group/workload_group_manager.cpp | 32 +++++++---- .../workload_group/workload_group_manager.h | 3 +- .../java/org/apache/doris/qe/SessionVariable.java | 1 + 20 files changed, 186 insertions(+), 73 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 9597371057e..b8d38d32674 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -24,6 +24,7 @@ #include "pipeline/exec/operator.h" #include "runtime/primitive_type.h" #include "runtime/thread_context.h" +#include "util/runtime_profile.h" #include "vec/common/hash_table/hash.h" #include "vec/exprs/vectorized_agg_fn.h" @@ -58,8 +59,8 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _agg_data = Base::_shared_state->agg_data.get(); _agg_arena_pool = Base::_shared_state->agg_arena_pool.get(); _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); @@ -76,6 +77,9 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); + _container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage", TUnit::BYTES); + _arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage", TUnit::BYTES); + return Status::OK(); } @@ -226,32 +230,36 @@ size_t AggSinkLocalState::_memory_usage() const { } void AggSinkLocalState::_update_memusage_with_serialized_key() { - std::visit(vectorized::Overload { - [&](std::monostate& arg) -> void { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); - }, - [&](auto& agg_method) -> void { - auto& data = *agg_method.hash_table; - auto arena_memory_usage = - _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage() - - Base::_shared_state->mem_usage_record.used_in_arena; - Base::_mem_tracker->consume(arena_memory_usage); - Base::_mem_tracker->consume( - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - _serialize_key_arena_memory_usage->add(arena_memory_usage); - COUNTER_UPDATE( - _hash_table_memory_usage, - data.get_buffer_size_in_bytes() - - Base::_shared_state->mem_usage_record.used_in_state); - Base::_shared_state->mem_usage_record.used_in_state = - data.get_buffer_size_in_bytes(); - Base::_shared_state->mem_usage_record.used_in_arena = - _agg_arena_pool->size() + - Base::_shared_state->aggregate_data_container->memory_usage(); - }}, - _agg_data->method_variant); + std::visit( + vectorized::Overload { + [&](std::monostate& arg) -> void { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + }, + [&](auto& agg_method) -> void { + auto& data = *agg_method.hash_table; + auto arena_memory_usage = + _agg_arena_pool->size() + + Base::_shared_state->aggregate_data_container->memory_usage() - + Base::_shared_state->mem_usage_record.used_in_arena; + Base::_mem_tracker->consume(arena_memory_usage); + Base::_mem_tracker->consume( + data.get_buffer_size_in_bytes() - + Base::_shared_state->mem_usage_record.used_in_state); + _serialize_key_arena_memory_usage->add(arena_memory_usage); + COUNTER_SET(_container_memory_usage, + Base::_shared_state->aggregate_data_container->memory_usage()); + COUNTER_SET(_arena_memory_usage, + static_cast<int64_t>(_agg_arena_pool->size())); + COUNTER_UPDATE(_hash_table_memory_usage, + data.get_buffer_size_in_bytes() - + Base::_shared_state->mem_usage_record.used_in_state); + Base::_shared_state->mem_usage_record.used_in_state = + data.get_buffer_size_in_bytes(); + Base::_shared_state->mem_usage_record.used_in_arena = + _agg_arena_pool->size() + + Base::_shared_state->aggregate_data_container->memory_usage(); + }}, + _agg_data->method_variant); } Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) { @@ -887,6 +895,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) { auto& ss = *local_state.Base::_shared_state; RETURN_IF_ERROR(ss.reset_hash_table()); local_state._agg_arena_pool = ss.agg_arena_pool.get(); + local_state._serialize_key_arena_memory_usage->set(0); return Status::OK(); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 39f11f6270f..2fa2d18a7e6 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -21,6 +21,7 @@ #include "pipeline/exec/operator.h" #include "runtime/exec_env.h" +#include "util/runtime_profile.h" namespace doris::pipeline { @@ -111,6 +112,8 @@ protected: RuntimeProfile::Counter* _max_row_size_counter = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _container_memory_usage = nullptr; + RuntimeProfile::Counter* _arena_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; bool _should_limit_output = false; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 6df089bbb5b..6391be5d70a 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -23,6 +23,7 @@ #include "common/exception.h" #include "pipeline/exec/operator.h" #include "runtime/thread_context.h" +#include "util/runtime_profile.h" #include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vexpr_fwd.h" @@ -44,7 +45,12 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _deserialize_data_timer = ADD_TIMER(Base::profile(), "DeserializeAndMergeTime"); _hash_table_compute_timer = ADD_TIMER(Base::profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); - _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); + _hash_table_input_counter = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableInputCount", TUnit::UNIT, 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); + _hash_table_size_counter = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize", TUnit::UNIT, 1); auto& p = _parent->template cast<AggSourceOperatorX>(); if (p._without_key) { @@ -626,6 +632,11 @@ void AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place } COUNTER_UPDATE(_hash_table_input_counter, num_rows); + COUNTER_SET(_hash_table_memory_usage, + static_cast<int64_t>( + agg_method.hash_table->get_buffer_size_in_bytes())); + COUNTER_SET(_hash_table_size_counter, + static_cast<int64_t>(agg_method.hash_table->size())); }}, _shared_state->agg_data->method_variant); } diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index a3824a381eb..98ddd6a2142 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -82,6 +82,8 @@ protected: RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; RuntimeProfile::Counter* _hash_table_input_counter = nullptr; + RuntimeProfile::Counter* _hash_table_size_counter = nullptr; + RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _merge_timer = nullptr; RuntimeProfile::Counter* _deserialize_data_timer = nullptr; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index ef706dd0dea..02e538c4ab3 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -74,7 +74,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _build_blocks_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1); _hash_table_memory_usage = - ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES, "MemoryUsage", 1); + ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); _build_arena_memory_usage = profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES, "MemoryUsage", 1); @@ -337,6 +337,8 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, _mem_tracker->consume(arg.hash_table->get_byte_size() - old_hash_table_size); _mem_tracker->consume(arg.serialized_keys_size(true) - old_key_size); + COUNTER_SET(_hash_table_memory_usage, + int64_t(arg.hash_table->get_byte_size())); return st; }}, *_shared_state->hash_table_variants, _shared_state->join_op_variants, diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 6d84b8e8bb5..0bed1d9e13d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -107,6 +107,10 @@ void PartitionedAggSinkLocalState::_init_counters() { _hash_table_emplace_timer = ADD_TIMER(Base::profile(), "HashTableEmplaceTime"); _hash_table_input_counter = ADD_COUNTER(Base::profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); + _container_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "ContainerMemoryUsage", TUnit::BYTES, 1); + _arena_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "ArenaMemoryUsage", TUnit::BYTES, 1); COUNTER_SET(_max_row_size_counter, (int64_t)0); _spill_serialize_hash_table_timer = @@ -133,6 +137,8 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime"); UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount"); UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes"); + UPDATE_PROFILE(_container_memory_usage, "ContainerMemoryUsage"); + UPDATE_PROFILE(_arena_memory_usage, "ArenaMemoryUsage"); update_max_min_rows_counter(); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 0027754cde0..22001b752a2 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -303,6 +303,8 @@ public: RuntimeProfile::Counter* _deserialize_data_timer = nullptr; RuntimeProfile::Counter* _max_row_size_counter = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; + RuntimeProfile::Counter* _container_memory_usage = nullptr; + RuntimeProfile::Counter* _arena_memory_usage = nullptr; RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index bf7ec22793f..8b281a88684 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -64,13 +64,16 @@ void PartitionedAggLocalState::_init_counters() { _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime"); _insert_keys_to_column_timer = ADD_TIMER(profile(), "InsertKeysToColumnTime"); _serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime"); - _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); + _hash_table_size_counter = ADD_COUNTER_WITH_LEVEL(profile(), "HashTableSize", TUnit::UNIT, 1); _merge_timer = ADD_TIMER(profile(), "MergeTime"); _deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime"); _hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime"); _hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime"); - _hash_table_input_counter = ADD_COUNTER(profile(), "HashTableInputCount", TUnit::UNIT); + _hash_table_input_counter = + ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount", TUnit::UNIT, 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); } #define UPDATE_PROFILE(counter, name) \ @@ -88,6 +91,7 @@ void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE(_insert_keys_to_column_timer, "InsertKeysToColumnTime"); UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime"); UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize"); + UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage"); } Status PartitionedAggLocalState::close(RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index 3505cf7eed8..05f9ff6eff0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -74,6 +74,7 @@ protected: RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; RuntimeProfile::Counter* _hash_table_input_counter = nullptr; + RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; }; class AggSourceOperatorX; class PartitionedAggSourceOperatorX : public OperatorX<PartitionedAggLocalState> { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index fa9e3ff23b7..4b769544b52 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -17,11 +17,13 @@ #include "partitioned_hash_join_probe_operator.h" +#include <gen_cpp/Metrics_types.h> #include <glog/logging.h> #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" +#include "util/runtime_profile.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -60,6 +62,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _recovery_probe_blocks = ADD_COUNTER(profile(), "SpillRecoveryProbeBlocks", TUnit::UNIT); _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillRecoveryProbeTime", 1); + _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBlocksBytes", TUnit::BYTES, 1); + _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", 1); _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 1); @@ -82,6 +86,10 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _build_expr_call_timer = ADD_CHILD_TIMER(profile(), "BuildExprCallTime", "BuildPhase"); _build_side_compute_hash_timer = ADD_CHILD_TIMER(profile(), "BuildSideHashComputingTime", "BuildPhase"); + + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); + _allocate_resource_timer = ADD_CHILD_TIMER(profile(), "AllocateResourceTime", "BuildPhase"); // Probe phase @@ -158,7 +166,8 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { return Status::OK(); } -Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* state, bool force) { +Status PartitionedHashJoinProbeLocalState::spill_probe_blocks( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context, bool force) { auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto query_id = state->query_id(); @@ -212,7 +221,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat return Status::OK(); }; - auto exception_catch_func = [query_id, spill_func, this]() { + auto exception_catch_func = [query_id, spill_func, spill_context, this]() { SCOPED_TIMER(_spill_timer); DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( @@ -228,8 +237,14 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat _spill_status = std::move(status); } _spill_dependency->set_ready(); + if (spill_context) { + spill_context->on_non_sink_task_finished(); + } }; + if (spill_context) { + spill_context->on_non_sink_task_started(); + } _spill_dependency->block(); DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func", { return Status::Error<INTERNAL_ERROR>( @@ -575,6 +590,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: } SCOPED_TIMER(local_state._partition_shuffle_timer); + int64_t bytes_of_blocks = 0; for (uint32_t i = 0; i != _partition_count; ++i) { const auto count = partition_indexes[i].size(); if (UNLIKELY(count == 0)) { @@ -592,9 +608,17 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: (eos && partitioned_blocks[i]->rows() > 0)) { local_state._probe_blocks[i].emplace_back(partitioned_blocks[i]->to_block()); partitioned_blocks[i].reset(); + } else { + bytes_of_blocks += partitioned_blocks[i]->allocated_bytes(); + } + + for (auto& block : local_state._probe_blocks[i]) { + bytes_of_blocks += block.allocated_bytes(); } } + COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks); + return Status::OK(); } @@ -604,6 +628,12 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill local_state._runtime_state = std::move(local_state._shared_state->inner_runtime_state); local_state._in_mem_shared_state_sptr = std::move(local_state._shared_state->inner_shared_state); + + auto* sink_state = local_state._runtime_state->get_sink_local_state(); + if (sink_state != nullptr) { + COUNTER_SET(local_state._hash_table_memory_usage, + sink_state->profile()->get_counter("HashTableMemoryUsage")->value()); + } return Status::OK(); } @@ -670,6 +700,9 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( << ", partition: " << local_state._partition_cursor << "rows: " << block.rows() << ", usage: " << _inner_sink_operator->get_memory_usage(local_state._runtime_state.get()); + + COUNTER_SET(local_state._hash_table_memory_usage, + sink_local_state->profile()->get_counter("HashTableMemoryUsage")->value()); return Status::OK(); } @@ -805,10 +838,10 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory( return Status::OK(); } - RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true)); + RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context, true)); if (_child) { - return _child->revoke_memory(state, nullptr); + return _child->revoke_memory(state, spill_context); } return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 621681ca4cf..3effcadfaa1 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -47,7 +47,9 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status spill_probe_blocks(RuntimeState* state, bool force = false); + Status spill_probe_blocks(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context = nullptr, + bool force = false); Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data); @@ -123,6 +125,9 @@ private: RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr; RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr; + RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; + RuntimeProfile::Counter* _probe_blocks_bytes = nullptr; + RuntimeProfile::Counter* _allocate_resource_timer = nullptr; RuntimeProfile::Counter* _probe_phase_label = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 5323d74341a..136466aa6b2 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -287,8 +287,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( Status PartitionedHashJoinSinkLocalState::revoke_memory( RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { - LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory" - << ", eos: " << _child_eos; + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " << state->task_id() + << " hash join sink " << _parent->node_id() << " revoke_memory" + << ", eos: " << _child_eos; DCHECK_EQ(_spilling_streams_count, 0); CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); @@ -450,6 +451,8 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( } } + VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " << _state->task_id() + << ", join sink " << _parent->node_id() << " revoke done"; auto num = _spilling_streams_count.fetch_sub(1); DCHECK_GE(_spilling_streams_count, 0); diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index 086a6881fcd..d2b157463ae 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -38,10 +38,10 @@ using SpillPartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillP struct SpillContext { std::atomic_int running_tasks_count; TUniqueId query_id; - std::function<void()> all_tasks_finished_callback; + std::function<void(SpillContext*)> all_tasks_finished_callback; SpillContext(int running_tasks_count_, TUniqueId query_id_, - std::function<void()> all_tasks_finished_callback_) + std::function<void(SpillContext*)> all_tasks_finished_callback_) : running_tasks_count(running_tasks_count_), query_id(std::move(query_id_)), all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {} @@ -50,14 +50,30 @@ struct SpillContext { LOG_IF(WARNING, running_tasks_count.load() != 0) << "query: " << print_id(query_id) << " not all spill tasks finished, remaining tasks: " << running_tasks_count.load(); + + LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0) + << "query: " << print_id(query_id) + << " not all spill tasks(non sink tasks) finished, remaining tasks: " + << _running_non_sink_tasks_count.load(); } void on_task_finished() { auto count = running_tasks_count.fetch_sub(1); - if (count == 1) { - all_tasks_finished_callback(); + if (count == 1 && _running_non_sink_tasks_count.load() == 0) { + all_tasks_finished_callback(this); } } + + void on_non_sink_task_started() { _running_non_sink_tasks_count.fetch_add(1); } + void on_non_sink_task_finished() { + const auto count = _running_non_sink_tasks_count.fetch_sub(1); + if (count == 1 && running_tasks_count.load() == 0) { + all_tasks_finished_callback(this); + } + } + +private: + std::atomic_int _running_non_sink_tasks_count {0}; }; class SpillRunnable : public Runnable { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 41424914edc..a959b8c1456 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -87,8 +87,8 @@ Status StreamingAggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(Base::exec_time_counter()); SCOPED_TIMER(Base::_init_timer); - _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "HashTable", - TUnit::BYTES, "MemoryUsage", 1); + _hash_table_memory_usage = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage", TUnit::BYTES, 1); _serialize_key_arena_memory_usage = Base::profile()->AddHighWaterMarkCounter( "SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 0028614b22a..aa2b01b741a 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -388,7 +388,7 @@ Status PipelineTask::execute(bool* eos) { if (!st.ok()) { LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " << reserve_size << "(sink reserve size:(" - << sink_reserve_size << " )" + << sink_reserve_size << ")" << ", sink name: " << _sink->get_name() << ", node id: " << _sink->node_id() << " failed: " << st.to_string() << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); @@ -547,19 +547,20 @@ size_t PipelineTask::get_revocable_size() const { return 0; } - auto revocable_size = _root->revocable_mem_size(_state); - revocable_size += _sink->revocable_mem_size(_state); - - return revocable_size; + return _sink->revocable_mem_size(_state) + _root->revocable_mem_size(_state); } Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) { - if (_sink->revocable_mem_size(_state) >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context)); + + const auto revocable_size = _sink->revocable_mem_size(_state); + if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context)); } else if (spill_context) { spill_context->on_task_finished(); + LOG(INFO) << "query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) + << " has not enough data to revoke: " << revocable_size; } - return Status::OK(); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 51e01a071bb..af4cd3d417d 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -486,26 +486,25 @@ Status QueryContext::revoke_memory() { } std::weak_ptr<QueryContext> this_ctx = shared_from_this(); - auto spill_context = - std::make_shared<pipeline::SpillContext>(chosen_tasks.size(), _query_id, [this_ctx] { + auto spill_context = std::make_shared<pipeline::SpillContext>( + chosen_tasks.size(), _query_id, [this_ctx](pipeline::SpillContext* context) { auto query_context = this_ctx.lock(); if (!query_context) { return; } LOG(INFO) << "query: " << print_id(query_context->_query_id) - << " all revoking tasks done, resume it."; + << ", context: " << ((void*)context) + << " all revoking tasks done, resumt it."; query_context->set_memory_sufficient(true); }); + LOG(INFO) << "query: " << print_id(_query_id) << ", context: " << ((void*)spill_context.get()) + << " total revoked size: " << revoked_size << ", tasks count: " << chosen_tasks.size() + << "/" << tasks.size(); for (auto* task : chosen_tasks) { RETURN_IF_ERROR(task->revoke_memory(spill_context)); } - - LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: " << revoked_size - << ", target_size: " << PrettyPrinter::print(target_revoking_size, TUnit::BYTES) - << ", tasks count: " << chosen_tasks.size() << "/" << tasks.size(); - return Status::OK(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 4ad946562bb..299e4ced55c 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -245,13 +245,15 @@ public: query_mem_tracker->set_limit(std::min<int64_t>(new_mem_limit, _user_set_mem_limit)); } + int64_t get_mem_limit() const { return query_mem_tracker->limit(); } + std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return query_mem_tracker; } - int32_t get_slot_count() { + int32_t get_slot_count() const { return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1; } - bool enable_query_slot_hard_limit() { + bool enable_query_slot_hard_limit() const { return _query_options.__isset.enable_query_slot_hard_limit ? _query_options.enable_query_slot_hard_limit : false; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index df65124f635..287a6b45729 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -328,7 +328,8 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { } if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { - bool spill_res = handle_single_query(query_ctx, query_ctx->paused_reason()); + bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, + query_ctx->paused_reason()); if (!spill_res) { ++query_it; continue; @@ -412,7 +413,8 @@ void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() { } if (query_it->cache_ratio_ < 0.001) { // TODO: Find other exceed limit workload group and cancel query. - bool spill_res = handle_single_query(query_ctx, query_ctx->paused_reason()); + bool spill_res = handle_single_query(query_ctx, query_it->reserve_size_, + query_ctx->paused_reason()); if (!spill_res) { ++query_it; continue; @@ -473,11 +475,12 @@ void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() { // If the query could not release memory, then cancel the query, the return value is true. // If the query is not ready to do these tasks, it means just wait. bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_ctx, - Status paused_reason) { + size_t size_to_reserve, Status paused_reason) { // TODO: If the query is an insert into select query, should consider memtable as revoke memory. size_t revocable_size = 0; size_t memory_usage = 0; bool has_running_task = false; + const auto query_id = print_id(query_ctx->query_id()); query_ctx->get_revocable_info(&revocable_size, &memory_usage, &has_running_task); if (has_running_task) { LOG(INFO) << "query: " << print_id(query_ctx->query_id()) @@ -488,16 +491,25 @@ bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c auto revocable_tasks = query_ctx->get_revocable_tasks(); if (revocable_tasks.empty()) { if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { - // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "query reserve memory failed, but could not find memory that " - "could " - "release or spill to disk")); + const auto limit = query_ctx->get_mem_limit(); + if ((memory_usage + size_to_reserve) < limit) { + LOG(INFO) << "query: " << query_id << ", usage(" << memory_usage << " + " + << size_to_reserve << ") less than limit(" << limit << "), resume it."; + query_ctx->set_memory_sufficient(true); + return true; + } else { + // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic + query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( + "query({}) reserve memory failed, but could not find memory that could " + "release or spill to disk(usage:{}, limit: {})", + query_id, memory_usage, query_ctx->get_mem_limit())); + } } else { query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>( - "The query reserved memory failed because process limit exceeded, and " + "The query({}) reserved memory failed because process limit exceeded, and " "there is no cache now. And could not find task to spill. Maybe you should set " - "the workload group's limit to a lower value.")); + "the workload group's limit to a lower value.", + query_id)); } } else { SCOPED_ATTACH_TASK(query_ctx.get()); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 8f69d5653b4..03f134006f5 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -106,7 +106,8 @@ public: void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& wg_memtable_usages); private: - bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, Status paused_reason); + bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, size_t size_to_reserve, + Status paused_reason); void handle_non_overcommit_wg_paused_queries(); void handle_overcommit_wg_paused_queries(); void change_query_to_hard_limit(WorkloadGroupPtr wg, bool enable_hard_limit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 125cacd9353..a9c9ab2a2ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -3793,6 +3793,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableSortSpill(enableSortSpill); tResult.setEnableAggSpill(enableAggSpill); tResult.setEnableForceSpill(enableForceSpill); + tResult.setExternalAggPartitionBits(externalAggPartitionBits); tResult.setMinRevocableMem(minRevocableMem); tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org