This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 41bb04b9e9e refactor logic of revoking and low memory mode in local exchange oper… (#41264) 41bb04b9e9e is described below commit 41bb04b9e9e237fd5680674d924228be7aa3b0e6 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Thu Sep 26 10:12:36 2024 +0800 refactor logic of revoking and low memory mode in local exchange oper… (#41264) …ator ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/dependency.h | 21 ++++++- be/src/pipeline/exec/operator.h | 37 ++++++++++++- .../exec/partitioned_aggregation_sink_operator.cpp | 29 ++++++++-- .../exec/partitioned_aggregation_sink_operator.h | 8 ++- .../exec/partitioned_hash_join_probe_operator.cpp | 6 +- .../exec/partitioned_hash_join_probe_operator.h | 3 +- .../exec/partitioned_hash_join_sink_operator.cpp | 59 ++++++++++++++------ .../exec/partitioned_hash_join_sink_operator.h | 11 ++-- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 19 ++++--- be/src/pipeline/exec/spill_sort_sink_operator.h | 7 ++- be/src/pipeline/exec/spill_utils.h | 32 +++++++++++ .../local_exchange_sink_operator.cpp | 8 ++- be/src/pipeline/local_exchange/local_exchanger.cpp | 49 +++++++++++++---- be/src/pipeline/pipeline_task.cpp | 9 ++- be/src/pipeline/pipeline_task.h | 2 +- be/src/runtime/query_context.cpp | 64 ++++++++++++++++++++++ be/src/runtime/query_context.h | 7 ++- .../workload_group/workload_group_manager.cpp | 38 ++++++------- 18 files changed, 325 insertions(+), 84 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 8cb479ccbb0..5f030dda5d2 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -26,6 +26,7 @@ #include <thread> #include <utility> +#include "common/config.h" #include "common/logging.h" #include "concurrentqueue.h" #include "gutil/integral_types.h" @@ -832,6 +833,7 @@ public: std::unique_ptr<ExchangerBase> exchanger {}; std::vector<MemTracker*> mem_trackers; std::atomic<int64_t> mem_usage = 0; + size_t _buffer_mem_limit = config::local_exchange_buffer_mem_limit; // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; virtual void create_dependencies(int local_exchange_id) { @@ -875,7 +877,7 @@ public: void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); } virtual void add_total_mem_usage(size_t delta, int channel_id) { - if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) { + if (mem_usage.fetch_add(delta) + delta > _buffer_mem_limit) { sink_deps.front()->block(); } } @@ -884,10 +886,15 @@ public: auto prev_usage = mem_usage.fetch_sub(delta); DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta << " channel_id: " << channel_id; - if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) { + if (prev_usage - delta <= _buffer_mem_limit) { sink_deps.front()->set_ready(); } } + + virtual void set_low_memory_mode() { + _buffer_mem_limit = + std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 * 1024 * 1024); + } }; struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { @@ -933,6 +940,14 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { source_deps[channel_id]->set_ready(); } + void set_low_memory_mode() override { + _buffer_mem_limit = + std::min<int64_t>(config::local_exchange_buffer_mem_limit, 10 * 1024 * 1024); + DCHECK(!_queues_mem_usage.empty()); + _each_queue_limit = + std::max<int64_t>(64 * 1024, _buffer_mem_limit / _queues_mem_usage.size()); + } + Dependency* get_sink_dep_by_channel_id(int channel_id) override { return sink_deps[channel_id].get(); } @@ -943,7 +958,7 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { private: std::vector<std::atomic_int64_t> _queues_mem_usage; - const int64_t _each_queue_limit; + int64_t _each_queue_limit; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 3a644eb4f02..1d2dbcc3592 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -32,6 +32,7 @@ #include "common/status.h" #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" +#include "pipeline/exec/spill_utils.h" #include "pipeline/local_exchange/local_exchanger.h" #include "runtime/memory/mem_tracker.h" #include "runtime/query_context.h" @@ -113,7 +114,11 @@ public: return state->minimum_operator_memory_required_bytes(); } - virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } + virtual Status revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) { + return Status::OK(); + } + [[nodiscard]] virtual bool require_data_distribution() const { return false; } OperatorPtr child() { return _child; } [[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; } @@ -603,6 +608,10 @@ public: _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", 1); _spill_read_wait_io_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", 1); + _spill_max_rows_of_partition = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1); + _spill_min_rows_of_partition = + ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMinRowsOfPartition", TUnit::UNIT, 1); return Status::OK(); } @@ -611,6 +620,25 @@ public: return dependencies; } + void update_max_min_rows_counter() { + int64_t max_rows = 0; + int64_t min_rows = std::numeric_limits<int64_t>::max(); + + for (auto rows : _rows_in_partitions) { + if (rows > max_rows) { + max_rows = rows; + } + if (rows < min_rows) { + min_rows = rows; + } + } + + COUNTER_SET(_spill_max_rows_of_partition, max_rows); + COUNTER_SET(_spill_min_rows_of_partition, min_rows); + } + + std::vector<int64_t> _rows_in_partitions; + RuntimeProfile::Counter* _spill_timer = nullptr; RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; @@ -619,6 +647,8 @@ public: RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr; RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; + RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr; + RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr; }; class OperatorXBase : public OperatorBase { @@ -718,9 +748,10 @@ public: return (_child and !is_source()) ? _child->revocable_mem_size(state) : 0; } - Status revoke_memory(RuntimeState* state) override { + Status revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) override { if (_child and !is_source()) { - return _child->revoke_memory(state); + return _child->revoke_memory(state, spill_context); } return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 39311a62dcc..6d84b8e8bb5 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -18,6 +18,7 @@ #include "partitioned_aggregation_sink_operator.h" #include <cstdint> +#include <limits> #include <memory> #include "aggregation_sink_operator.h" @@ -26,6 +27,8 @@ #include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" +#include "util/runtime_profile.h" +#include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -60,6 +63,8 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column()); } + _rows_in_partitions.assign(Base::_shared_state->partition_count, 0); + _spill_dependency = Dependency::create_shared(parent.operator_id(), parent.node_id(), "AggSinkSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); @@ -128,6 +133,8 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime"); UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount"); UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes"); + + update_max_min_rows_counter(); } PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, @@ -172,7 +179,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: if (eos) { if (local_state._shared_state->is_spilled) { if (revocable_mem_size(state) > 0) { - RETURN_IF_ERROR(revoke_memory(state)); + RETURN_IF_ERROR(revoke_memory(state, nullptr)); } else { for (auto& partition : local_state._shared_state->spill_partitions) { RETURN_IF_ERROR(partition->finish_current_spilling(eos)); @@ -184,6 +191,10 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: local_state._dependency->set_ready_to_read(); local_state._finish_dependency->set_ready(); } + } else if (local_state._shared_state->is_spilled) { + if (revocable_mem_size(state) >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + return revoke_memory(state, nullptr); + } } if (local_state._runtime_state) { auto* sink_local_state = local_state._runtime_state->get_sink_local_state(); @@ -191,9 +202,10 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: } return Status::OK(); } -Status PartitionedAggSinkOperatorX::revoke_memory(RuntimeState* state) { +Status PartitionedAggSinkOperatorX::revoke_memory( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); - return local_state.revoke_memory(state); + return local_state.revoke_memory(state, spill_context); } size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) const { @@ -240,7 +252,8 @@ size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { return _agg_sink_operator->get_reserve_mem_size(runtime_state); } -Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { +Status PartitionedAggSinkLocalState::revoke_memory( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { const auto size_to_revoke = _parent->revocable_mem_size(state); VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << Base::_parent->node_id() @@ -279,7 +292,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( state, _shared_state->shared_from_this(), - [this, &parent, state, query_id, size_to_revoke, submit_timer] { + [this, &parent, state, query_id, size_to_revoke, spill_context, submit_timer] { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_sink " @@ -308,8 +321,12 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { Base::_dependency->set_ready_to_read(); _finish_dependency->set_ready(); } - Base::_spill_dependency->Dependency::set_ready(); state->get_query_ctx()->decrease_revoking_tasks_count(); + Base::_spill_dependency->Dependency::set_ready(); + + if (spill_context) { + spill_context->on_task_finished(); + } }}; auto* runtime_state = _runtime_state.get(); auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 756b686a5b3..0027754cde0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include <limits> #include <memory> #include "aggregation_sink_operator.h" @@ -45,7 +46,7 @@ public: Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } - Status revoke_memory(RuntimeState* state); + Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context); Status setup_in_memory_agg_op(RuntimeState* state); @@ -102,6 +103,7 @@ public: for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled(); ++i) { if (spill_infos[i].keys_.size() >= spill_batch_rows) { + _rows_in_partitions[i] += spill_infos[i].keys_.size(); status = _spill_partition( state, context, Base::_shared_state->spill_partitions[i], spill_infos[i].keys_, spill_infos[i].values_, nullptr, false); @@ -117,6 +119,7 @@ public: auto spill_null_key_data = (hash_null_key_data && i == Base::_shared_state->partition_count - 1); if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) { + _rows_in_partitions[i] += spill_infos[i].keys_.size(); status = _spill_partition(state, context, Base::_shared_state->spill_partitions[i], spill_infos[i].keys_, spill_infos[i].values_, spill_null_key_data @@ -338,7 +341,8 @@ public: } size_t revocable_mem_size(RuntimeState* state) const override; - Status revoke_memory(RuntimeState* state) override; + Status revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) override; size_t get_reserve_mem_size(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index f95cca1f6af..fa9e3ff23b7 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -167,6 +167,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat MonotonicStopWatch submit_timer; submit_timer.start(); + auto spill_func = [query_id, state, submit_timer, spill_size_threshold, this] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); SCOPED_TIMER(_spill_probe_timer); @@ -791,7 +792,8 @@ size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat return mem_size; } -Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) { +Status PartitionedHashJoinProbeOperatorX::revoke_memory( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos; @@ -806,7 +808,7 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) { RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true)); if (_child) { - return _child->revoke_memory(state); + return _child->revoke_memory(state, nullptr); } return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index b611fb661af..621681ca4cf 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -186,7 +186,8 @@ public: return _inner_probe_operator->require_data_distribution(); } - Status revoke_memory(RuntimeState* state) override; + Status revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) override; private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index f2375632d2a..5323d74341a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -20,9 +20,11 @@ #include <glog/logging.h> #include <algorithm> +#include <memory> #include "common/logging.h" #include "pipeline/exec/operator.h" +#include "pipeline/exec/spill_utils.h" #include "pipeline/pipeline_task.h" #include "runtime/fragment_mgr.h" #include "util/mem_info.h" @@ -41,6 +43,8 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _shared_state->partitioned_build_blocks.resize(p._partition_count); _shared_state->spilled_streams.resize(p._partition_count); + _rows_in_partitions.assign(p._partition_count, 0); + _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "HashJoinBuildSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); @@ -125,16 +129,17 @@ size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta return size_to_reserve; } -Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) { +Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); _shared_state->inner_shared_state->hash_table_variants.reset(); auto row_desc = p._child->row_desc(); const auto num_slots = row_desc.num_slots(); vectorized::Block build_block; size_t block_old_mem = 0; - auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); + auto* inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); if (inner_sink_state_) { - auto inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_); + auto* inner_sink_state = assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_); build_block = inner_sink_state->_build_side_mutable_block.to_block(); block_old_mem = build_block.allocated_bytes(); } @@ -142,6 +147,9 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta if (build_block.rows() <= 1) { LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id() << ", task: " << state->task_id(); + if (spill_context) { + spill_context->on_task_finished(); + } return Status::OK(); } @@ -243,7 +251,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _spill_dependency->set_ready(); }; - auto exception_catch_func = [spill_func, this]() mutable { + auto exception_catch_func = [spill_func, spill_context, this]() mutable { SCOPED_TIMER(_spill_timer); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(spill_func()); @@ -256,6 +264,10 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _spill_status_ok = false; _spill_dependency->set_ready(); } + + if (spill_context) { + spill_context->on_task_finished(); + } }; auto spill_runnable = std::make_shared<SpillRunnable>(state, _shared_state->shared_from_this(), @@ -273,7 +285,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta return thread_pool->submit(std::move(spill_runnable)); } -Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { +Status PartitionedHashJoinSinkLocalState::revoke_memory( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; DCHECK_EQ(_spilling_streams_count, 0); @@ -282,7 +295,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { if (!_shared_state->need_to_spill) { profile()->add_info_string("Spilled", "true"); _shared_state->need_to_spill = true; - return _revoke_unpartitioned_block(state); + return _revoke_unpartitioned_block(state, spill_context); } _spilling_streams_count = _shared_state->partitioned_build_blocks.size(); @@ -317,8 +330,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { state->get_query_ctx()->increase_revoking_tasks_count(); auto spill_runnable = std::make_shared<SpillRunnable>( state, _shared_state->shared_from_this(), - [this, state, query_id, spilling_stream, i, submit_timer] { - SCOPED_TIMER(_spill_timer); + [this, query_id, spilling_stream, i, submit_timer, spill_context] { DBUG_EXECUTE_IF( "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( @@ -331,7 +343,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { SCOPED_TIMER(_spill_build_timer); auto status = [&]() { - RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, spilling_stream)); + RETURN_IF_CATCH_EXCEPTION( + _spill_to_disk(i, spilling_stream, spill_context)); return Status::OK(); }(); @@ -341,8 +354,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { _spill_status_ok = false; _spill_status = std::move(status); } - - state->get_query_ctx()->decrease_revoking_tasks_count(); }); if (st.ok()) { st = spill_io_pool->submit(std::move(spill_runnable)); @@ -368,6 +379,10 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } }); _dependency->set_ready_to_read(); + + if (spill_context) { + spill_context->on_task_finished(); + } } return Status::OK(); } @@ -408,13 +423,17 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, } RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(in_block, partition_indexes[i].data(), partition_indexes[i].data() + count)); + _rows_in_partitions[i] += count; } + update_max_min_rows_counter(); + return Status::OK(); } void PartitionedHashJoinSinkLocalState::_spill_to_disk( - uint32_t partition_index, const vectorized::SpillStreamSPtr& spilling_stream) { + uint32_t partition_index, const vectorized::SpillStreamSPtr& spilling_stream, + const std::shared_ptr<SpillContext>& spill_context) { auto& partitioned_block = _shared_state->partitioned_build_blocks[partition_index]; if (_spill_status_ok) { @@ -436,6 +455,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (num == 1) { std::unique_lock<std::mutex> lock(_spill_lock); + _state->get_query_ctx()->decrease_revoking_tasks_count(); _spill_dependency->set_ready(); if (_child_eos) { VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " @@ -449,6 +469,10 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( }); _dependency->set_ready_to_read(); } + + if (spill_context) { + spill_context->on_task_finished(); + } } } @@ -575,7 +599,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B << _inner_sink_operator->get_memory_usage( local_state._shared_state->inner_runtime_state.get()); } else { - return revoke_memory(state); + return revoke_memory(state, nullptr); } std::for_each(local_state._shared_state->partitioned_build_blocks.begin(), @@ -594,9 +618,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (need_to_spill) { RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); if (eos) { - return revoke_memory(state); + return revoke_memory(state, nullptr); } else if (revocable_mem_size(state) > vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { - return revoke_memory(state); + return revoke_memory(state, nullptr); } } else { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { @@ -630,10 +654,11 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state) return local_state.revocable_mem_size(state); } -Status PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) { +Status PartitionedHashJoinSinkOperatorX::revoke_memory( + RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - return local_state.revoke_memory(state); + return local_state.revoke_memory(state, spill_context); } size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index b2c79967b97..a2d75cf2f9b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -43,7 +43,7 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; - Status revoke_memory(RuntimeState* state); + Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context); size_t revocable_mem_size(RuntimeState* state) const; [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state); @@ -52,12 +52,14 @@ protected: : PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {} void _spill_to_disk(uint32_t partition_index, - const vectorized::SpillStreamSPtr& spilling_stream); + const vectorized::SpillStreamSPtr& spilling_stream, + const std::shared_ptr<SpillContext>& spill_context); Status _partition_block(RuntimeState* state, vectorized::Block* in_block, size_t begin, size_t end); - Status _revoke_unpartitioned_block(RuntimeState* state); + Status _revoke_unpartitioned_block(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context); friend class PartitionedHashJoinSinkOperatorX; @@ -102,7 +104,8 @@ public: size_t revocable_mem_size(RuntimeState* state) const override; - Status revoke_memory(RuntimeState* state) override; + Status revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) override; size_t get_reserve_mem_size(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index e55982bcb3b..e83ad897257 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -131,9 +131,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) { return _sort_sink_operator->open(state); } -Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) { +Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) { auto& local_state = get_local_state(state); - return local_state.revoke_memory(state); + return local_state.revoke_memory(state, spill_context); } size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { @@ -163,7 +164,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc if (eos) { if (local_state._shared_state->is_spilled) { if (revocable_mem_size(state) > 0) { - RETURN_IF_ERROR(revoke_memory(state)); + RETURN_IF_ERROR(revoke_memory(state, nullptr)); } else { local_state._dependency->set_ready_to_read(); local_state._finish_dependency->set_ready(); @@ -178,7 +179,8 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } -Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { +Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) { if (!_shared_state->is_spilled) { _shared_state->is_spilled = true; profile()->add_info_string("Spilled", "true"); @@ -234,14 +236,13 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { } _spilling_stream.reset(); + state->get_query_ctx()->decrease_revoking_tasks_count(); if (_eos) { _dependency->set_ready_to_read(); _finish_dependency->set_ready(); } else { _spill_dependency->Dependency::set_ready(); } - - state->get_query_ctx()->decrease_revoking_tasks_count(); }}; _shared_state->sink_status = @@ -273,7 +274,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { return Status::OK(); }; - auto exception_catch_func = [this, query_id, spill_func]() { + auto exception_catch_func = [this, query_id, spill_context, spill_func]() { DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", { ExecEnv::GetInstance()->fragment_mgr()->cancel_query( query_id, Status::InternalError("fault_inject spill_sort_sink " @@ -284,6 +285,10 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { _shared_state->sink_status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + + if (spill_context) { + spill_context->on_task_finished(); + } }; DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index e74b5d2a414..173e3c7847c 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -17,6 +17,8 @@ #pragma once +#include <memory> + #include "operator.h" #include "sort_sink_operator.h" @@ -38,7 +40,7 @@ public: Dependency* finishdependency() override { return _finish_dependency.get(); } Status setup_in_memory_sort_op(RuntimeState* state); - Status revoke_memory(RuntimeState* state); + Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context); private: void _init_counters(); @@ -86,7 +88,8 @@ public: size_t revocable_mem_size(RuntimeState* state) const override; - Status revoke_memory(RuntimeState* state) override; + Status revoke_memory(RuntimeState* state, + const std::shared_ptr<SpillContext>& spill_context) override; using DataSinkOperatorX<LocalStateType>::node_id; using DataSinkOperatorX<LocalStateType>::operator_id; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index 925e7df44e6..086a6881fcd 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -17,6 +17,13 @@ #pragma once +#include <gen_cpp/Types_types.h> +#include <glog/logging.h> + +#include <atomic> +#include <functional> +#include <utility> + #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" @@ -28,6 +35,31 @@ namespace doris::pipeline { using SpillPartitionerType = vectorized::Crc32HashPartitioner<vectorized::SpillPartitionChannelIds>; +struct SpillContext { + std::atomic_int running_tasks_count; + TUniqueId query_id; + std::function<void()> all_tasks_finished_callback; + + SpillContext(int running_tasks_count_, TUniqueId query_id_, + std::function<void()> all_tasks_finished_callback_) + : running_tasks_count(running_tasks_count_), + query_id(std::move(query_id_)), + all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {} + + ~SpillContext() { + LOG_IF(WARNING, running_tasks_count.load() != 0) + << "query: " << print_id(query_id) + << " not all spill tasks finished, remaining tasks: " << running_tasks_count.load(); + } + + void on_task_finished() { + auto count = running_tasks_count.fetch_sub(1); + if (count == 1) { + all_tasks_finished_callback(); + } + } +}; + class SpillRunnable : public Runnable { public: SpillRunnable(RuntimeState* state, const std::shared_ptr<BasicSharedState>& shared_state, diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 19c37f3649b..ec0d1a40f5e 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -28,7 +28,7 @@ LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = default; std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const { auto deps = Base::dependencies(); - auto dep = _shared_state->get_sink_dep_by_channel_id(_channel_id); + auto* dep = _shared_state->get_sink_dep_by_channel_id(_channel_id); if (dep != nullptr) { deps.push_back(dep); } @@ -136,7 +136,13 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + + if (state->get_query_ctx()->low_memory_mode()) { + local_state._shared_state->set_low_memory_mode(); + } + RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); + local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption()); // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index f4630f328bb..50f359922a1 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -43,13 +43,17 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, allocated_bytes = block->data_block.allocated_bytes(); } std::unique_lock l(_m); - local_state._shared_state->add_mem_usage(channel_id, allocated_bytes, - !std::is_same_v<PartitionedBlock, BlockType> && - !std::is_same_v<BroadcastBlock, BlockType>); + constexpr bool update_mem_usage = !std::is_same_v<PartitionedBlock, BlockType> && + !std::is_same_v<BroadcastBlock, BlockType>; + local_state._shared_state->add_mem_usage(channel_id, allocated_bytes, update_mem_usage); + if constexpr (update_mem_usage) { + local_state._mem_tracker->consume(allocated_bytes); + } if (_data_queue[channel_id].enqueue(std::move(block))) { local_state._shared_state->set_ready_to_read(channel_id); } else { local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes); + local_state._mem_tracker->release(allocated_bytes); // `enqueue(block)` return false iff this queue's source operator is already closed so we // just unref the block. if constexpr (std::is_same_v<PartitionedBlock, BlockType> || @@ -77,11 +81,13 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v<PartitionedBlock, BlockType> || std::is_same_v<BroadcastBlock, BlockType>) { - local_state._shared_state->sub_mem_usage(channel_id, - block.first->data_block.allocated_bytes()); + const auto bytes = block.first->data_block.allocated_bytes(); + local_state._shared_state->sub_mem_usage(channel_id, bytes); + local_state._mem_tracker->release(bytes); } else { - local_state._shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); + const auto bytes = block->data_block.allocated_bytes(); + local_state._shared_state->sub_mem_usage(channel_id, bytes); + local_state._mem_tracker->release(bytes); data_block->swap(block->data_block); block->unref(local_state._shared_state, data_block->allocated_bytes(), channel_id); DCHECK_EQ(block->ref_value(), 0); @@ -94,11 +100,15 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v<PartitionedBlock, BlockType> || std::is_same_v<BroadcastBlock, BlockType>) { + const auto bytes = block.first->data_block.allocated_bytes(); local_state._shared_state->sub_mem_usage(channel_id, block.first->data_block.allocated_bytes()); + local_state._mem_tracker->release(bytes); } else { + const auto bytes = block->data_block.allocated_bytes(); local_state._shared_state->sub_mem_usage(channel_id, block->data_block.allocated_bytes()); + local_state._mem_tracker->release(bytes); data_block->swap(block->data_block); block->unref(local_state._shared_state, data_block->allocated_bytes(), channel_id); DCHECK_EQ(block->ref_value(), 0); @@ -128,6 +138,8 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, in_block, local_state)); } + local_state._memory_used_counter->set(local_state._shared_state->mem_usage); + return Status::OK(); } @@ -200,8 +212,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (new_block_wrapper->data_block.empty()) { return Status::OK(); } - local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(), - local_state._channel_id); + + const auto block_bytes = new_block_wrapper->data_block.allocated_bytes(); + local_state._shared_state->add_total_mem_usage(block_bytes, local_state._channel_id); + local_state._mem_tracker->consume(block_bytes); auto bucket_seq_to_instance_idx = local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx; if (get_type() == ExchangeType::HASH_SHUFFLE) { @@ -272,6 +286,8 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo auto channel_id = (local_state._channel_id++) % _num_partitions; _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper)); + local_state._memory_used_counter->set(local_state._shared_state->mem_usage); + return Status::OK(); } @@ -316,6 +332,8 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block BlockWrapperSPtr wrapper = BlockWrapper::create_shared(std::move(new_block)); _enqueue_data_and_set_ready(0, local_state, std::move(wrapper)); + local_state._memory_used_counter->set(local_state._shared_state->mem_usage); + return Status::OK(); } @@ -346,6 +364,8 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ if (eos) { local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready(); } + + local_state._memory_used_counter->set(local_state._shared_state->mem_usage); return Status::OK(); } @@ -424,8 +444,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block } new_block.swap(*in_block); auto wrapper = BlockWrapper::create_shared(std::move(new_block)); - local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(), - local_state._channel_id); + const auto block_bytes = wrapper->data_block.allocated_bytes(); + local_state._shared_state->add_total_mem_usage(block_bytes, local_state._channel_id); + local_state._mem_tracker->consume(block_bytes); wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, wrapper->data_block.rows()}}); @@ -475,6 +496,7 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, _enqueue_data_and_set_ready(channel_id, local_state, BlockWrapper::create_shared(std::move(new_block))); + local_state._memory_used_counter->set(local_state._shared_state->mem_usage); return Status::OK(); } @@ -494,7 +516,10 @@ Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz std::iota(channel_ids.begin() + i, channel_ids.end(), 0); } } - return _split_rows(state, channel_ids.data(), block, local_state); + + RETURN_IF_ERROR(_split_rows(state, channel_ids.data(), block, local_state)); + local_state._memory_used_counter->set(local_state._shared_state->mem_usage); + return Status::OK(); } Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 356feea9c36..4a1e4536373 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -553,11 +553,14 @@ size_t PipelineTask::get_revocable_size() const { return revocable_size; } -Status PipelineTask::revoke_memory() { +Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) { if (_sink->revocable_mem_size(_state) >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - RETURN_IF_ERROR(_sink->revoke_memory(_state)); + RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context)); + } else if (spill_context) { + spill_context->on_task_finished(); } - return _root->revoke_memory(_state); + + return Status::OK(); } void PipelineTask::wake_up() { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 943366b4b70..633cca93f46 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -248,7 +248,7 @@ public: } [[nodiscard]] size_t get_revocable_size() const; - [[nodiscard]] Status revoke_memory(); + [[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>& spill_context); void add_spill_dependency(Dependency* dependency) { _spill_dependencies.emplace_back(dependency); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 6a0cf20a4dd..45fd5562a93 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -23,13 +23,17 @@ #include <gen_cpp/Types_types.h> #include <glog/logging.h> +#include <algorithm> #include <exception> #include <memory> #include <mutex> +#include <shared_mutex> #include <sstream> #include <utility> +#include <vector> #include "common/logging.h" +#include "common/status.h" #include "olap/olap_common.h" #include "pipeline/dependency.h" #include "pipeline/pipeline_fragment_context.h" @@ -442,6 +446,66 @@ size_t QueryContext::get_revocable_size() const { return revocable_size; } +Status QueryContext::revoke_memory() { + std::vector<std::pair<size_t, pipeline::PipelineTask*>> tasks; + std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> fragments; + for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { + auto fragment_ctx = fragment_wptr.lock(); + if (!fragment_ctx) { + continue; + } + + auto tasks_of_fragment = fragment_ctx->get_revocable_tasks(); + for (auto* task : tasks_of_fragment) { + tasks.emplace_back(task->get_revocable_size(), task); + } + fragments.emplace_back(std::move(fragment_ctx)); + } + + std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return l.first > r.first; }); + + const auto mem_limit = query_mem_tracker->limit(); + const auto target_revoking_size = mem_limit * 0.2; + size_t revoked_size = 0; + + std::vector<pipeline::PipelineTask*> chosen_tasks; + for (auto&& [revocable_size, task] : tasks) { + chosen_tasks.emplace_back(task); + + revoked_size += revocable_size; + if (revoked_size >= target_revoking_size) { + break; + } + } + + std::weak_ptr<QueryContext> this_ctx = shared_from_this(); + auto spill_context = + std::make_shared<pipeline::SpillContext>(chosen_tasks.size(), _query_id, [this_ctx] { + auto query_context = this_ctx.lock(); + if (!query_context) { + return; + } + + LOG(INFO) << "query: " << print_id(query_context->_query_id) + << " all revoking tasks done, resumt it."; + query_context->set_memory_sufficient(true); + }); + + for (auto* task : chosen_tasks) { + RETURN_IF_ERROR(task->revoke_memory(spill_context)); + } + + LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: " << revoked_size + << ", target_size: " << target_revoking_size + << ", tasks count: " << chosen_tasks.size() << "/" << tasks.size(); + + return Status::OK(); +} + +void QueryContext::decrease_revoking_tasks_count() { + _revoking_tasks_count.fetch_sub(1); +} + std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const { std::vector<pipeline::PipelineTask*> tasks; for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 96b18eedfb9..b74b835af63 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -22,6 +22,7 @@ #include <gen_cpp/Types_types.h> #include <atomic> +#include <cstdint> #include <memory> #include <mutex> #include <string> @@ -191,6 +192,8 @@ public: std::vector<pipeline::PipelineTask*> get_revocable_tasks() const; + Status revoke_memory(); + void register_query_statistics(std::shared_ptr<QueryStatistics> qs); std::shared_ptr<QueryStatistics> get_query_statistics(); @@ -228,7 +231,7 @@ public: void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); } - void decrease_revoking_tasks_count() { _revoking_tasks_count.fetch_sub(1); } + void decrease_revoking_tasks_count(); int get_revoking_tasks_count() const { return _revoking_tasks_count.load(); } @@ -354,6 +357,8 @@ private: bool _is_pipeline = false; bool _is_nereids = false; std::atomic<int> _running_big_mem_op_num = 0; + + std::mutex _revoking_tasks_mutex; std::atomic<int> _revoking_tasks_count = 0; // A token used to submit olap scanner to the "_limited_scan_thread_pool", diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 290d1fe1d5b..92235c1ded7 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -17,13 +17,17 @@ #include "workload_group_manager.h" +#include <glog/logging.h> + #include <algorithm> #include <memory> #include <mutex> #include <unordered_map> +#include "common/status.h" #include "exec/schema_scanner/schema_scanner_helper.h" #include "pipeline/task_scheduler.h" +#include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" #include "util/mem_info.h" @@ -198,8 +202,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { doris::GlobalMemoryArbitrator::sys_mem_available_details_str(), PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES), weighted_memory_limit_ratio); - // LOG_EVERY_T(INFO, 60) << debug_msg; - LOG(INFO) << debug_msg; + LOG_EVERY_T(INFO, 60) << debug_msg; for (auto& wg : _workload_groups) { auto wg_mem_limit = wg.second->memory_limit(); auto wg_weighted_mem_limit = int64_t(wg_mem_limit * weighted_memory_limit_ratio); @@ -299,8 +302,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { } // During memory insufficent stage, we already set every query's memlimit, so that the flag is useless any more. wg.second->update_memory_sufficent(true); - // LOG_EVERY_T(INFO, 60) << debug_msg; - LOG(INFO) << debug_msg; + LOG_EVERY_T(INFO, 60) << debug_msg; } } @@ -348,6 +350,7 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que * strategy 5: If any query exceed process's memlimit and cache is zero, then do spill disk or cancel it. */ void WorkloadGroupMgr::handle_paused_queries() { + const int64_t TIMEOUT_IN_QUEUE = 1000L * 10; std::unique_lock<std::mutex> lock(_paused_queries_lock); for (auto it = _paused_queries_list.begin(); it != _paused_queries_list.end();) { auto& queries_list = it->second; @@ -376,9 +379,13 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } if (query_ctx->is_cancelled()) { - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) - << "was canceled, remove from paused list"; - query_it = queries_list.erase(query_it); + /// Memory may not be released immediately after a query is canceled. + /// So here wait for a while. + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << " was canceled, remove from paused list"; + query_it = queries_list.erase(query_it); + } continue; } if (query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { @@ -401,8 +408,9 @@ void WorkloadGroupMgr::handle_paused_queries() { } // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, // and then set wg's flag, other query may not free memory very quickly. - if (query_it->elapsed_time() > 1000) { + if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) { // set wg's memory to insufficent, then add it back to task scheduler to run. + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << " will be resume."; query_ctx->set_memory_sufficient(true); query_it = queries_list.erase(query_it); } else { @@ -437,6 +445,8 @@ void WorkloadGroupMgr::handle_paused_queries() { if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted < 0.001 && query_it->cache_ratio_ > 0.001) { + LOG(INFO) << "query: " << print_id(query_ctx->query_id()) + << " will be resume after cache adjust."; query_ctx->set_memory_sufficient(true); query_it = queries_list.erase(query_it); continue; @@ -479,17 +489,7 @@ bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query } } else { SCOPED_ATTACH_TASK(query_ctx.get()); - // TODO, should spill the task that has max memory, not all - for (auto* task : revocable_tasks) { - auto st = task->revoke_memory(); - if (!st.ok()) { - query_ctx->cancel(st); - break; - } - } - LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has " - << revocable_tasks.size() - << " tasks to revoke memory, revocable size: " << revocable_size; + RETURN_IF_ERROR(query_ctx->revoke_memory()); } return true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org