This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d988193d39e [pipelineX](shuffle) block exchange sink by memory usage (#26595) d988193d39e is described below commit d988193d39e188b1c909ea2449238ad68b71e5ae Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Nov 9 21:28:22 2023 +0800 [pipelineX](shuffle) block exchange sink by memory usage (#26595) --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +- be/src/pipeline/exec/exchange_sink_buffer.h | 2 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 8 +++-- be/src/pipeline/exec/exchange_sink_operator.h | 18 ++++++++-- be/src/pipeline/exec/exchange_source_operator.cpp | 6 ++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 40 +++++++++++------------ be/src/vec/runtime/vdata_stream_recvr.h | 8 ++--- be/src/vec/sink/vdata_stream_sender.cpp | 7 ++++ 8 files changed, 58 insertions(+), 34 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 4f93a39caa6..29933fcdd15 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -62,7 +62,8 @@ namespace pipeline { template <typename Parent> ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, QueryContext* context) - : _is_finishing(false), + : _queue_capacity(0), + _is_finishing(false), _query_id(query_id), _dest_node_id(dest_node_id), _sender_id(send_id), diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 2b30f6fac70..c04de2a51f3 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -199,6 +199,7 @@ private: phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo<Parent>, std::list<TransmitInfo<Parent>>>> _instance_to_package_queue; + size_t _queue_capacity; // store data in broadcast shuffle phmap::flat_hash_map<InstanceLoId, std::queue<BroadcastTransmitInfo<Parent>, std::list<BroadcastTransmitInfo<Parent>>>> @@ -237,7 +238,6 @@ private: std::atomic<int> _total_queue_size = 0; static constexpr int QUEUE_CAPACITY_FACTOR = 64; - int _queue_capacity = 0; std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr; std::shared_ptr<FinishDependency> _finish_dependency = nullptr; QueryStatistics* _statistics = nullptr; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 44d6b6448ed..25418492954 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -200,8 +200,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); DCHECK(_local_channels_dependency[dep_id] != nullptr); deps_for_channels->add_child(_local_channels_dependency[dep_id]); - _wait_channel_timer[dep_id] = - ADD_CHILD_TIMER(_profile, "WaitForLocalExchangeBuffer", timer_name); + _wait_channel_timer[dep_id] = ADD_CHILD_TIMER( + _profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name); dep_id++; } } @@ -213,12 +213,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf new vectorized::XXHashPartitioner<vectorized::ShuffleChannelIds>(channels.size())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("XXHashPartitioner({})", _partition_count)); } else if (p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = channel_shared_ptrs.size(); _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( channel_shared_ptrs.size())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); } return Status::OK(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 2b42d28958a..6b9d3b5e4b1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -134,11 +134,25 @@ private: class LocalExchangeChannelDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency); - LocalExchangeChannelDependency(int id) - : WriteDependency(id, "LocalExchangeChannelDependency") {} + LocalExchangeChannelDependency(int id, std::shared_ptr<bool> mem_available) + : WriteDependency(id, "LocalExchangeChannelDependency"), + _mem_available(mem_available) {} ~LocalExchangeChannelDependency() override = default; + WriteDependency* write_blocked_by() override { + if (config::enable_fuzzy_mode && !_is_runnable() && + _should_log(_write_dependency_watcher.elapsed_time())) { + LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " + << id(); + } + return _is_runnable() ? nullptr : this; + } + void* shared_state() override { return nullptr; } + +private: + bool _is_runnable() const { return _ready_for_write || *_mem_available; } + std::shared_ptr<bool> _mem_available; }; class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index a37272fd741..3213ed55778 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -60,10 +60,10 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { queues[i]->set_dependency(deps[i]); source_dependency->add_child(deps[i]); } + static const std::string timer_name = + "WaitForDependency[" + source_dependency->name() + "]Time"; + _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name); for (size_t i = 0; i < queues.size(); i++) { - static const std::string timer_name = - "WaitForDependency[" + source_dependency->name() + "]Time"; - _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name); metrics[i] = ADD_CHILD_TIMER(_runtime_profile, fmt::format("WaitForData{}", i), timer_name); } RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone( diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 9384d4abbfc..be291828f0f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -103,8 +103,8 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block if (!_is_cancelled && _num_remaining_senders > 0) { _dependency->block_reading(); } - for (auto& it : _local_channel_dependency) { - it->set_ready_for_write(); + if (_local_channel_dependency) { + _local_channel_dependency->set_ready_for_write(); } } @@ -349,22 +349,23 @@ VDataStreamRecvr::VDataStreamRecvr( _profile(profile), _peak_memory_usage_counter(nullptr), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), - _enable_pipeline(state->enable_pipeline_exec()) { + _enable_pipeline(state->enable_pipeline_exec()), + _mem_available(std::make_shared<bool>(true)) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id)); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + // Create one queue per sender if is_merging is true. + int num_queues = is_merging ? num_senders : 1; if (state->enable_pipeline_x_exec()) { - _sender_to_local_channel_dependency.resize(num_senders); - for (size_t i = 0; i < num_senders; i++) { + _sender_to_local_channel_dependency.resize(num_queues); + for (size_t i = 0; i < num_queues; i++) { _sender_to_local_channel_dependency[i] = - pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id); + pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id, + _mem_available); } } - - // Create one queue per sender if is_merging is true. - int num_queues = is_merging ? num_senders : 1; _sender_queues.reserve(num_queues); int num_sender_per_queue = is_merging ? 1 : num_senders; for (int i = 0; i < num_queues; ++i) { @@ -372,14 +373,7 @@ VDataStreamRecvr::VDataStreamRecvr( if (_enable_pipeline) { queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile)); if (state->enable_pipeline_x_exec()) { - auto dependencies = - is_merging - ? std::vector<std::shared_ptr< - pipeline:: - LocalExchangeChannelDependency>> {_sender_to_local_channel_dependency - [i]} - : _sender_to_local_channel_dependency; - queue->set_local_channel_dependency(dependencies); + queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]); } } else { queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile)); @@ -449,9 +443,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) { std::shared_ptr<pipeline::LocalExchangeChannelDependency> VDataStreamRecvr::get_local_channel_dependency(int sender_id) { - DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id); - DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr); - return _sender_to_local_channel_dependency[sender_id]; + DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != nullptr); + return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0]; } bool VDataStreamRecvr::ready_to_read() { @@ -504,7 +497,12 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) { void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); - _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); + auto val = _blocks_memory_usage_current_value.fetch_add(size); + if (val + size > config::exchg_node_buffer_size_bytes) { + *_mem_available = false; + } else { + *_mem_available = true; + } } void VDataStreamRecvr::close() { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 2f5c88301e2..e31b433491a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -186,6 +186,8 @@ private: bool _enable_pipeline; std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>> _sender_to_local_channel_dependency; + + std::shared_ptr<bool> _mem_available; }; class ThreadClosure : public google::protobuf::Closure { @@ -204,8 +206,7 @@ public: virtual ~SenderQueue(); void set_local_channel_dependency( - std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>& - local_channel_dependency) { + std::shared_ptr<pipeline::LocalExchangeChannelDependency> local_channel_dependency) { _local_channel_dependency = local_channel_dependency; } @@ -255,8 +256,7 @@ protected: std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> _local_closure; std::shared_ptr<pipeline::ExchangeDataDependency> _dependency = nullptr; - std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>> - _local_channel_dependency; + std::shared_ptr<pipeline::LocalExchangeChannelDependency> _local_channel_dependency; }; class VDataStreamRecvr::PipSenderQueue : public SenderQueue { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 6c64cf8db04..f5397e831d3 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -460,6 +460,13 @@ Status VDataStreamSender::prepare(RuntimeState* state) { } else if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { RETURN_IF_ERROR(_partitioner->prepare(state, _row_desc)); + if (_part_type == TPartitionType::HASH_PARTITIONED) { + _profile->add_info_string("Partitioner", + fmt::format("XXHashPartitioner({})", _partition_count)); + } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + _profile->add_info_string("Partitioner", + fmt::format("Crc32HashPartitioner({})", _partition_count)); + } } _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org