This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d988193d39e [pipelineX](shuffle) block exchange sink by memory usage 
(#26595)
d988193d39e is described below

commit d988193d39e188b1c909ea2449238ad68b71e5ae
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Nov 9 21:28:22 2023 +0800

    [pipelineX](shuffle) block exchange sink by memory usage (#26595)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp     |  3 +-
 be/src/pipeline/exec/exchange_sink_buffer.h       |  2 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp   |  8 +++--
 be/src/pipeline/exec/exchange_sink_operator.h     | 18 ++++++++--
 be/src/pipeline/exec/exchange_source_operator.cpp |  6 ++--
 be/src/vec/runtime/vdata_stream_recvr.cpp         | 40 +++++++++++------------
 be/src/vec/runtime/vdata_stream_recvr.h           |  8 ++---
 be/src/vec/sink/vdata_stream_sender.cpp           |  7 ++++
 8 files changed, 58 insertions(+), 34 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 4f93a39caa6..29933fcdd15 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -62,7 +62,8 @@ namespace pipeline {
 template <typename Parent>
 ExchangeSinkBuffer<Parent>::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id,
                                                int send_id, int be_number, 
QueryContext* context)
-        : _is_finishing(false),
+        : _queue_capacity(0),
+          _is_finishing(false),
           _query_id(query_id),
           _dest_node_id(dest_node_id),
           _sender_id(send_id),
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2b30f6fac70..c04de2a51f3 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -199,6 +199,7 @@ private:
     phmap::flat_hash_map<InstanceLoId,
                          std::queue<TransmitInfo<Parent>, 
std::list<TransmitInfo<Parent>>>>
             _instance_to_package_queue;
+    size_t _queue_capacity;
     // store data in broadcast shuffle
     phmap::flat_hash_map<InstanceLoId, 
std::queue<BroadcastTransmitInfo<Parent>,
                                                   
std::list<BroadcastTransmitInfo<Parent>>>>
@@ -237,7 +238,6 @@ private:
 
     std::atomic<int> _total_queue_size = 0;
     static constexpr int QUEUE_CAPACITY_FACTOR = 64;
-    int _queue_capacity = 0;
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
     std::shared_ptr<FinishDependency> _finish_dependency = nullptr;
     QueryStatistics* _statistics = nullptr;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 44d6b6448ed..25418492954 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -200,8 +200,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
                 _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
                 DCHECK(_local_channels_dependency[dep_id] != nullptr);
                 
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
-                _wait_channel_timer[dep_id] =
-                        ADD_CHILD_TIMER(_profile, 
"WaitForLocalExchangeBuffer", timer_name);
+                _wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
+                        _profile, fmt::format("WaitForLocalExchangeBuffer{}", 
dep_id), timer_name);
                 dep_id++;
             }
         }
@@ -213,12 +213,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
                 new 
vectorized::XXHashPartitioner<vectorized::ShuffleChannelIds>(channels.size()));
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("XXHashPartitioner({})", 
_partition_count));
     } else if (p._part_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         _partition_count = channel_shared_ptrs.size();
         _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
                 channel_shared_ptrs.size()));
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+        _profile->add_info_string("Partitioner",
+                                  fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 2b42d28958a..6b9d3b5e4b1 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -134,11 +134,25 @@ private:
 class LocalExchangeChannelDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
-    LocalExchangeChannelDependency(int id)
-            : WriteDependency(id, "LocalExchangeChannelDependency") {}
+    LocalExchangeChannelDependency(int id, std::shared_ptr<bool> mem_available)
+            : WriteDependency(id, "LocalExchangeChannelDependency"),
+              _mem_available(mem_available) {}
     ~LocalExchangeChannelDependency() override = default;
 
+    WriteDependency* write_blocked_by() override {
+        if (config::enable_fuzzy_mode && !_is_runnable() &&
+            _should_log(_write_dependency_watcher.elapsed_time())) {
+            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
+                         << id();
+        }
+        return _is_runnable() ? nullptr : this;
+    }
+
     void* shared_state() override { return nullptr; }
+
+private:
+    bool _is_runnable() const { return _ready_for_write || *_mem_available; }
+    std::shared_ptr<bool> _mem_available;
 };
 
 class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index a37272fd741..3213ed55778 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -60,10 +60,10 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         queues[i]->set_dependency(deps[i]);
         source_dependency->add_child(deps[i]);
     }
+    static const std::string timer_name =
+            "WaitForDependency[" + source_dependency->name() + "]Time";
+    _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
     for (size_t i = 0; i < queues.size(); i++) {
-        static const std::string timer_name =
-                "WaitForDependency[" + source_dependency->name() + "]Time";
-        _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
         metrics[i] = ADD_CHILD_TIMER(_runtime_profile, 
fmt::format("WaitForData{}", i), timer_name);
     }
     
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 9384d4abbfc..be291828f0f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -103,8 +103,8 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
         if (!_is_cancelled && _num_remaining_senders > 0) {
             _dependency->block_reading();
         }
-        for (auto& it : _local_channel_dependency) {
-            it->set_ready_for_write();
+        if (_local_channel_dependency) {
+            _local_channel_dependency->set_ready_for_write();
         }
     }
 
@@ -349,22 +349,23 @@ VDataStreamRecvr::VDataStreamRecvr(
           _profile(profile),
           _peak_memory_usage_counter(nullptr),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
-          _enable_pipeline(state->enable_pipeline_exec()) {
+          _enable_pipeline(state->enable_pipeline_exec()),
+          _mem_available(std::make_shared<bool>(true)) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
     _mem_tracker =
             std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id));
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
+    // Create one queue per sender if is_merging is true.
+    int num_queues = is_merging ? num_senders : 1;
     if (state->enable_pipeline_x_exec()) {
-        _sender_to_local_channel_dependency.resize(num_senders);
-        for (size_t i = 0; i < num_senders; i++) {
+        _sender_to_local_channel_dependency.resize(num_queues);
+        for (size_t i = 0; i < num_queues; i++) {
             _sender_to_local_channel_dependency[i] =
-                    
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id);
+                    
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id,
+                                                                            
_mem_available);
         }
     }
-
-    // Create one queue per sender if is_merging is true.
-    int num_queues = is_merging ? num_senders : 1;
     _sender_queues.reserve(num_queues);
     int num_sender_per_queue = is_merging ? 1 : num_senders;
     for (int i = 0; i < num_queues; ++i) {
@@ -372,14 +373,7 @@ VDataStreamRecvr::VDataStreamRecvr(
         if (_enable_pipeline) {
             queue = _sender_queue_pool.add(new PipSenderQueue(this, 
num_sender_per_queue, profile));
             if (state->enable_pipeline_x_exec()) {
-                auto dependencies =
-                        is_merging
-                                ? std::vector<std::shared_ptr<
-                                          pipeline::
-                                                  
LocalExchangeChannelDependency>> {_sender_to_local_channel_dependency
-                                                                               
             [i]}
-                                : _sender_to_local_channel_dependency;
-                queue->set_local_channel_dependency(dependencies);
+                
queue->set_local_channel_dependency(_sender_to_local_channel_dependency[i]);
             }
         } else {
             queue = _sender_queue_pool.add(new SenderQueue(this, 
num_sender_per_queue, profile));
@@ -449,9 +443,8 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
 
 std::shared_ptr<pipeline::LocalExchangeChannelDependency>
 VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
-    DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id);
-    DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr);
-    return _sender_to_local_channel_dependency[sender_id];
+    DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != 
nullptr);
+    return _sender_to_local_channel_dependency[_is_merging ? sender_id : 0];
 }
 
 bool VDataStreamRecvr::ready_to_read() {
@@ -504,7 +497,12 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
 
 void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
     _blocks_memory_usage->add(size);
-    _blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
+    auto val = _blocks_memory_usage_current_value.fetch_add(size);
+    if (val + size > config::exchg_node_buffer_size_bytes) {
+        *_mem_available = false;
+    } else {
+        *_mem_available = true;
+    }
 }
 
 void VDataStreamRecvr::close() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 2f5c88301e2..e31b433491a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -186,6 +186,8 @@ private:
     bool _enable_pipeline;
     std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
             _sender_to_local_channel_dependency;
+
+    std::shared_ptr<bool> _mem_available;
 };
 
 class ThreadClosure : public google::protobuf::Closure {
@@ -204,8 +206,7 @@ public:
     virtual ~SenderQueue();
 
     void set_local_channel_dependency(
-            
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>&
-                    local_channel_dependency) {
+            std::shared_ptr<pipeline::LocalExchangeChannelDependency> 
local_channel_dependency) {
         _local_channel_dependency = local_channel_dependency;
     }
 
@@ -255,8 +256,7 @@ protected:
     std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> 
_local_closure;
 
     std::shared_ptr<pipeline::ExchangeDataDependency> _dependency = nullptr;
-    std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
-            _local_channel_dependency;
+    std::shared_ptr<pipeline::LocalExchangeChannelDependency> 
_local_channel_dependency;
 };
 
 class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 6c64cf8db04..f5397e831d3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -460,6 +460,13 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
                _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         RETURN_IF_ERROR(_partitioner->prepare(state, _row_desc));
+        if (_part_type == TPartitionType::HASH_PARTITIONED) {
+            _profile->add_info_string("Partitioner",
+                                      fmt::format("XXHashPartitioner({})", 
_partition_count));
+        } else if (_part_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+            _profile->add_info_string("Partitioner",
+                                      fmt::format("Crc32HashPartitioner({})", 
_partition_count));
+        }
     }
 
     _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to