This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a038fdaec6 [Bug](pipeline) Fix bug in non-local exchange on pipeline 
engine (#16463)
a038fdaec6 is described below

commit a038fdaec642ff6a42a2c776d8d76076138efd5b
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Feb 9 19:22:40 2023 +0800

    [Bug](pipeline) Fix bug in non-local exchange on pipeline engine (#16463)
    
    Currently, for broadcast shuffle, we serialize a block once and then send 
it by RPC through multiple channel. After this, we will serialize next block in 
the same memory for consideration of memory reuse. However, since the RPC is 
asynchronized, maybe the next block serialization will happen before sending 
the previous block.
    
    So, in this PR, I use a ref count to identify if the serialized block can 
be reuse in broadcast shuffle.
---
 be/src/common/config.h                        |   2 +
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 127 +++++++++++++++++---------
 be/src/pipeline/exec/exchange_sink_buffer.h   |  17 +++-
 be/src/vec/sink/vdata_stream_sender.cpp       |  48 +++++++++-
 be/src/vec/sink/vdata_stream_sender.h         |  62 ++++++++++++-
 5 files changed, 207 insertions(+), 49 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index de719094bd..b823aff817 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -896,6 +896,8 @@ CONF_Int32(query_bkd_inverted_index_limit_percent, "5"); // 
5%
 CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict");
 // tree depth for bkd index
 CONF_Int32(max_depth_in_bkd_tree, "32");
+// use num_broadcast_buffer blocks as buffer to do broadcast
+CONF_Int32(num_broadcast_buffer, "32");
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index aedeeec19b..36f5aab77c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -32,7 +32,8 @@ namespace doris::pipeline {
 template <typename T>
 class SelfDeleteClosure : public google::protobuf::Closure {
 public:
-    SelfDeleteClosure(InstanceLoId id, bool eos) : _id(id), _eos(eos) {}
+    SelfDeleteClosure(InstanceLoId id, bool eos, 
vectorized::BroadcastPBlockHolder* data = nullptr)
+            : _id(id), _eos(eos), _data(data) {}
     ~SelfDeleteClosure() override = default;
     SelfDeleteClosure(const SelfDeleteClosure& other) = delete;
     SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete;
@@ -56,6 +57,9 @@ public:
             } else {
                 _suc_fn(_id, _eos, result);
             }
+            if (_data) {
+                _data->unref();
+            }
         } catch (const std::exception& exp) {
             LOG(FATAL) << "brpc callback error: " << exp.what();
         } catch (...) {
@@ -71,6 +75,7 @@ private:
     std::function<void(const InstanceLoId&, const bool&, const T&)> _suc_fn;
     InstanceLoId _id;
     bool _eos;
+    vectorized::BroadcastPBlockHolder* _data;
 };
 
 ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id, int send_id,
@@ -126,6 +131,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId 
fragment_instance_id) {
     _instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
     _instance_to_seq[low_id] = 0;
     _instance_to_package_queue[low_id] = std::queue<TransmitInfo, 
std::list<TransmitInfo>>();
+    _instance_to_broadcast_package_queue[low_id] =
+            std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>();
     PUniqueId finst_id;
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
@@ -155,58 +162,92 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
     return Status::OK();
 }
 
+Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
+    if (_is_finishing) {
+        return Status::OK();
+    }
+    TUniqueId ins_id = request.channel->_fragment_instance_id;
+    bool send_now = false;
+    request.block_holder->ref();
+    {
+        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
+        // Do not have in process rpc, directly send
+        if (_instance_to_sending_by_pipeline[ins_id.lo]) {
+            send_now = true;
+            _instance_to_sending_by_pipeline[ins_id.lo] = false;
+        }
+        
_instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request));
+    }
+    if (send_now) {
+        RETURN_IF_ERROR(_send_rpc(ins_id.lo));
+    }
+
+    return Status::OK();
+}
+
 Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
 
     std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
-    if (q.empty() || _is_finishing) {
+    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
+            _instance_to_broadcast_package_queue[id];
+
+    if (_is_finishing) {
         _instance_to_sending_by_pipeline[id] = true;
         return Status::OK();
     }
 
-    TransmitInfo& request = q.front();
-
-    if (!_instance_to_request[id]) {
-        _construct_request(id);
-    }
-
-    auto brpc_request = _instance_to_request[id];
-    brpc_request->set_eos(request.eos);
-    brpc_request->set_packet_seq(_instance_to_seq[id]++);
-    if (request.block) {
-        brpc_request->set_allocated_block(request.block.get());
-    }
+#define DO_RPC(QUEUE, BLOCK, HOLDER)                                           
                    \
+    auto& request = QUEUE.front();                                             
                    \
+    if (!_instance_to_request[id]) {                                           
                    \
+        _construct_request(id);                                                
                    \
+    }                                                                          
                    \
+    auto brpc_request = _instance_to_request[id];                              
                    \
+    brpc_request->set_eos(request.eos);                                        
                    \
+    brpc_request->set_packet_seq(_instance_to_seq[id]++);                      
                    \
+    if (request.BLOCK) {                                                       
                    \
+        brpc_request->set_allocated_block(request.BLOCK);                      
                    \
+    }                                                                          
                    \
+    auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, 
request.eos, HOLDER);          \
+    _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);          
                    \
+    _closure->addFailedHandler(                                                
                    \
+            [&](const InstanceLoId& id, const std::string& err) { _failed(id, 
err); });            \
+    _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,   
                    \
+                                    const PTransmitDataResult& result) {       
                    \
+        Status s = Status(result.status());                                    
                    \
+        if (!s.ok()) {                                                         
                    \
+            _failed(id,                                                        
                    \
+                    fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));   \
+        } else if (eos) {                                                      
                    \
+            _ended(id);                                                        
                    \
+        } else {                                                               
                    \
+            _send_rpc(id);                                                     
                    \
+        }                                                                      
                    \
+    });                                                                        
                    \
+    {                                                                          
                    \
+        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
    \
+        if (enable_http_send_block(*brpc_request)) {                           
                    \
+            RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), 
_closure,           \
+                                                *brpc_request, 
request.channel->_brpc_dest_addr)); \
+        } else {                                                               
                    \
+            transmit_block(*request.channel->_brpc_stub, _closure, 
*brpc_request);                 \
+        }                                                                      
                    \
+    }                                                                          
                    \
+    if (request.BLOCK) {                                                       
                    \
+        brpc_request->release_block();                                         
                    \
+    }                                                                          
                    \
+    QUEUE.pop();
 
-    auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, 
request.eos);
-    _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
-    _closure->addFailedHandler(
-            [&](const InstanceLoId& id, const std::string& err) { _failed(id, 
err); });
-    _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
-                                    const PTransmitDataResult& result) {
-        Status s = Status(result.status());
-        if (!s.ok()) {
-            _failed(id, fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
-        } else if (eos) {
-            _ended(id);
-        } else {
-            _send_rpc(id);
-        }
-    });
-
-    {
-        
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
-        if (enable_http_send_block(*brpc_request)) {
-            RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), 
_closure,
-                                                *brpc_request, 
request.channel->_brpc_dest_addr));
-        } else {
-            transmit_block(*request.channel->_brpc_stub, _closure, 
*brpc_request);
-        }
-    }
-
-    if (request.block) {
-        brpc_request->release_block();
+    if (!q.empty()) {
+        // If we have data to shuffle which is not broadcasted
+        DO_RPC(q, block, nullptr)
+    } else if (!broadcast_q.empty()) {
+        // If we have data to shuffle which is broadcasted
+        DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder)
+    } else {
+        _instance_to_sending_by_pipeline[id] = true;
+        return Status::OK();
     }
-    q.pop();
 
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index eff77c05bb..7be17706ce 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -31,13 +31,20 @@
 namespace doris {
 namespace vectorized {
 class PipChannel;
-}
+class BroadcastPBlockHolder;
+} // namespace vectorized
 
 namespace pipeline {
 using InstanceLoId = int64_t;
 struct TransmitInfo {
     vectorized::PipChannel* channel;
-    std::unique_ptr<PBlock> block;
+    PBlock* block;
+    bool eos;
+};
+
+struct BroadcastTransmitInfo {
+    vectorized::PipChannel* channel;
+    vectorized::BroadcastPBlockHolder* block_holder;
     bool eos;
 };
 
@@ -50,6 +57,7 @@ public:
     ~ExchangeSinkBuffer();
     void register_sink(TUniqueId);
     Status add_block(TransmitInfo&& request);
+    Status add_block(BroadcastTransmitInfo&& request);
     bool can_write() const;
     bool is_pending_finish() const;
     void close();
@@ -57,8 +65,13 @@ public:
 private:
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
             _instance_to_package_queue_mutex;
+    // store data in non-broadcast shuffle
     phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, 
std::list<TransmitInfo>>>
             _instance_to_package_queue;
+    // store data in broadcast shuffle
+    phmap::flat_hash_map<InstanceLoId,
+                         std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>>
+            _instance_to_broadcast_package_queue;
     using PackageSeq = int64_t;
     // must init zero
     phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index c44e8acd74..c7a9e62335 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -242,7 +242,7 @@ Status Channel::close_internal() {
         RETURN_IF_ERROR(send_current_block(true));
     } else {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
-        RETURN_IF_ERROR(send_block(nullptr, true));
+        RETURN_IF_ERROR(send_block((PBlock*)nullptr, true));
     }
     // Don't wait for the last packet to finish, left it to close_wait.
     return Status::OK();
@@ -287,7 +287,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
            sink.output_partition.type == TPartitionType::RANDOM ||
            sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
-    _cur_pb_block = &_pb_block1;
 
     std::map<int64_t, int64_t> fragment_id_to_channel_index;
 
@@ -317,6 +316,12 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
         }
     }
     _name = "VDataStreamSender";
+    if (state->enable_pipeline_exec()) {
+        _broadcast_pb_blocks.resize(config::num_broadcast_buffer);
+        _broadcast_pb_block_idx = 0;
+    } else {
+        _cur_pb_block = &_pb_block1;
+    }
 }
 
 VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const 
RowDescriptor& row_desc,
@@ -470,6 +475,23 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
             for (auto channel : _channels) {
                 RETURN_IF_ERROR(channel->send_local_block(block));
             }
+        } else if (state->enable_pipeline_exec()) {
+            BroadcastPBlockHolder* block_holder = nullptr;
+            RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
+            {
+                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                RETURN_IF_ERROR(
+                        serialize_block(block, block_holder->get_block(), 
_channels.size()));
+            }
+
+            for (auto channel : _channels) {
+                if (channel->is_local()) {
+                    RETURN_IF_ERROR(channel->send_local_block(block));
+                } else {
+                    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                    RETURN_IF_ERROR(channel->send_block(block_holder, eos));
+                }
+            }
         } else {
             {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -620,6 +642,28 @@ void VDataStreamSender::_roll_pb_block() {
     _cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1);
 }
 
+Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** 
holder) {
+    constexpr int MAX_LOOP = 1000;
+
+    size_t it = 0;
+    while (it < MAX_LOOP) {
+        if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) {
+            _broadcast_pb_block_idx = 0;
+        }
+
+        for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); 
_broadcast_pb_block_idx++) {
+            if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) {
+                _broadcast_pb_block_idx++;
+                *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1];
+                return Status::OK();
+            }
+        }
+        it++;
+    }
+    return Status::InternalError(
+            "Exceed the max loop limit when acquire the next available 
buffer!");
+}
+
 void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) 
{
     for (auto channel : _channels) {
         ((PipChannel*)channel)->registe(buffer);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index e644aaf847..19858c259d 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -49,6 +49,42 @@ namespace vectorized {
 class VExprContext;
 class Channel;
 
+template <typename T>
+struct AtomicWrapper {
+    std::atomic<T> _value;
+
+    AtomicWrapper() : _value() {}
+
+    AtomicWrapper(const std::atomic<T>& a) : _value(a.load()) {}
+
+    AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {}
+
+    AtomicWrapper& operator=(const AtomicWrapper& other) { 
_value.store(other._a.load()); }
+};
+
+// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast 
shuffle, one PBlock
+// will be shared between different channel, so we have to use a ref count to 
mark if this
+// PBlock is available for next serialization.
+class BroadcastPBlockHolder {
+public:
+    BroadcastPBlockHolder() : _ref_count(0) {}
+    ~BroadcastPBlockHolder() noexcept = default;
+
+    void unref() noexcept {
+        DCHECK_GT(_ref_count._value, 0);
+        _ref_count._value.fetch_sub(1);
+    }
+    void ref() noexcept { _ref_count._value.fetch_add(1); }
+
+    bool available() { return _ref_count._value == 0; }
+
+    PBlock* get_block() { return &pblock; }
+
+private:
+    AtomicWrapper<uint32_t> _ref_count;
+    PBlock pblock;
+};
+
 class VDataStreamSender : public DataSink {
 public:
     friend class pipeline::ExchangeSinkOperator;
@@ -91,6 +127,7 @@ protected:
     friend class pipeline::ExchangeSinkBuffer;
 
     void _roll_pb_block();
+    Status _get_next_available_buffer(BroadcastPBlockHolder** holder);
 
     Status get_partition_column_result(Block* block, int* result) const {
         int counter = 0;
@@ -131,6 +168,10 @@ protected:
     PBlock _pb_block2;
     PBlock* _cur_pb_block;
 
+    // used by pipeline engine
+    std::vector<BroadcastPBlockHolder> _broadcast_pb_blocks;
+    int _broadcast_pb_block_idx;
+
     // compute per-row partition values
     std::vector<VExprContext*> _partition_expr_ctxs;
 
@@ -219,6 +260,10 @@ public:
     // if batch is nullptr, send the eof packet
     virtual Status send_block(PBlock* block, bool eos = false);
 
+    virtual Status send_block(BroadcastPBlockHolder* block, bool eos = false) {
+        return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
+    }
+
     Status add_rows(Block* block, const std::vector<int>& row);
 
     virtual Status send_current_block(bool eos);
@@ -369,8 +414,21 @@ public:
             }
         }
         if (eos || block->column_metas_size()) {
-            RETURN_IF_ERROR(_buffer->add_block(
-                    {this, block ? std::make_unique<PBlock>(*block) : nullptr, 
eos}));
+            RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
+        }
+        return Status::OK();
+    }
+
+    Status send_block(BroadcastPBlockHolder* block, bool eos = false) override 
{
+        if (eos) {
+            if (_eos_send) {
+                return Status::OK();
+            } else {
+                _eos_send = true;
+            }
+        }
+        if (eos || block->get_block()->column_metas_size()) {
+            RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
         }
         return Status::OK();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to