This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d585a8acc1 [Improvement](shuffle) Accumulate rows in a batch for 
shuffling (#22218)
d585a8acc1 is described below

commit d585a8acc18e4ceb4ab9e625b9721c80124d27f7
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Aug 1 09:55:06 2023 +0800

    [Improvement](shuffle) Accumulate rows in a batch for shuffling (#22218)
---
 be/src/vec/sink/vdata_stream_sender.cpp   | 237 ++++++++++++++++++------------
 be/src/vec/sink/vdata_stream_sender.h     |  73 ++++++---
 be/test/vec/runtime/vdata_stream_test.cpp |   5 +-
 3 files changed, 197 insertions(+), 118 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index dec3b43f09..d1c768a5e6 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -95,6 +95,8 @@ Status Channel::init(RuntimeState* state) {
                 _fragment_instance_id, _dest_node_id);
     }
 
+    _serializer.reset(new BlockSerializer(_parent, _is_local));
+
     // In bucket shuffle join will set fragment_instance_id (-1, -1)
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
@@ -110,10 +112,9 @@ Status Channel::send_current_block(bool eos) {
         return send_local_block(eos);
     }
     SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
-    auto block = _mutable_block->to_block();
-    RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block));
-    block.clear_column_data();
-    _mutable_block->set_muatable_columns(block.mutate_columns());
+    if (eos) {
+        RETURN_IF_ERROR(_serializer->serialize_block(_ch_cur_pb_block, 1));
+    }
     RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
     ch_roll_pb_block();
     return Status::OK();
@@ -121,8 +122,8 @@ Status Channel::send_current_block(bool eos) {
 
 Status Channel::send_local_block(bool eos) {
     SCOPED_TIMER(_parent->_local_send_timer);
-    Block block = _mutable_block->to_block();
-    _mutable_block->set_muatable_columns(block.clone_empty_columns());
+    Block block = _serializer->get_block()->to_block();
+    
_serializer->get_block()->set_muatable_columns(block.clone_empty_columns());
     if (_recvr_is_valid()) {
         COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
         COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
@@ -133,7 +134,7 @@ Status Channel::send_local_block(bool eos) {
         }
         return Status::OK();
     } else {
-        _mutable_block.reset();
+        _serializer->reset_block();
         return _receiver_status;
     }
 }
@@ -202,36 +203,11 @@ Status Channel::add_rows(Block* block, const 
std::vector<int>& rows) {
         return Status::OK();
     }
 
-    if (_mutable_block == nullptr) {
-        SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
-        _mutable_block = MutableBlock::create_unique(block->clone_empty());
-    }
-
-    int row_wait_add = rows.size();
-    int batch_size = _parent->state()->batch_size();
-    const int* begin = &rows[0];
-
-    while (row_wait_add > 0) {
-        int row_add = 0;
-        int max_add = batch_size - _mutable_block->rows();
-        if (row_wait_add >= max_add) {
-            row_add = max_add;
-        } else {
-            row_add = row_wait_add;
-        }
-
-        {
-            SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
-            SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
-            _mutable_block->add_rows(block, begin, begin + row_add);
-        }
-
-        row_wait_add -= row_add;
-        begin += row_add;
-
-        if (row_add == max_add) {
-            RETURN_IF_ERROR(send_current_block(false));
-        }
+    bool serialized = false;
+    RETURN_IF_ERROR(
+            _serializer->next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, &rows));
+    if (serialized) {
+        RETURN_IF_ERROR(send_current_block(false));
     }
 
     return Status::OK();
@@ -248,7 +224,7 @@ Status Channel::close_wait(RuntimeState* state) {
         _need_close = false;
         return st;
     }
-    _mutable_block.reset();
+    _serializer->reset_block();
     return Status::OK();
 }
 
@@ -257,15 +233,15 @@ Status Channel::close_internal() {
         return Status::OK();
     }
     VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
-             << " dest_node=" << _dest_node_id
-             << " #rows= " << ((_mutable_block == nullptr) ? 0 : 
_mutable_block->rows())
+             << " dest_node=" << _dest_node_id << " #rows= "
+             << ((_serializer->get_block() == nullptr) ? 0 : 
_serializer->get_block()->rows())
              << " receiver status: " << _receiver_status;
     if (is_receiver_eof()) {
-        _mutable_block.reset();
+        _serializer->reset_block();
         return Status::OK();
     }
     Status status;
-    if (_mutable_block != nullptr && _mutable_block->rows() > 0) {
+    if (_serializer->get_block() != nullptr && 
_serializer->get_block()->rows() > 0) {
         status = send_current_block(true);
     } else {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
@@ -312,7 +288,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
           _row_desc(row_desc),
           _current_channel_idx(0),
           _part_type(sink.output_partition.type),
-          _ignore_not_found(sink.__isset.ignore_not_found ? 
sink.ignore_not_found : true),
           _profile(nullptr),
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
@@ -377,7 +352,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
           _row_desc(row_desc),
           _current_channel_idx(0),
           _part_type(TPartitionType::UNPARTITIONED),
-          _ignore_not_found(true),
           _profile(nullptr),
           _serialize_batch_timer(nullptr),
           _compress_timer(nullptr),
@@ -415,7 +389,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, 
const RowDescriptor& row_
           _pool(pool),
           _row_desc(row_desc),
           _current_channel_idx(0),
-          _ignore_not_found(true),
           _profile(nullptr),
           _serialize_batch_timer(nullptr),
           _compress_timer(nullptr),
@@ -479,9 +452,10 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
         RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
_row_desc));
     }
 
+    _serializer.reset(new BlockSerializer(this));
+
     _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
     _uncompressed_bytes_counter = ADD_COUNTER(profile(), 
"UncompressedRowBatchSize", TUnit::BYTES);
-    _ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
     _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
     _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
     _compress_timer = ADD_TIMER(profile(), "CompressTime");
@@ -491,6 +465,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _split_block_hash_compute_timer = ADD_TIMER(profile(), 
"SplitBlockHashComputeTime");
     _split_block_distribute_by_channel_timer =
             ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime");
+    _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
     _blocks_sent_counter = ADD_COUNTER(profile(), "BlocksSent", TUnit::UNIT);
     _overall_throughput = profile()->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
@@ -539,6 +514,30 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
     }
 
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
+#ifndef BROADCAST_ALL_CHANNELS
+#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)           
                    \
+    {                                                                          
                    \
+        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());                        
                    \
+        bool serialized = false;                                               
                    \
+        RETURN_IF_ERROR(                                                       
                    \
+                _serializer->next_serialized_block(block, PBLOCK, 
_channels.size(), &serialized)); \
+        if (serialized) {                                                      
                    \
+            Status status;                                                     
                    \
+            for (auto channel : _channels) {                                   
                    \
+                if (!channel->is_receiver_eof()) {                             
                    \
+                    if (channel->is_local()) {                                 
                    \
+                        status = channel->send_local_block(block);             
                    \
+                    } else {                                                   
                    \
+                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());        
                    \
+                        status = channel->send_block(PBLOCK_TO_SEND, false);   
                    \
+                    }                                                          
                    \
+                    HANDLE_CHANNEL_STATUS(state, channel, status);             
                    \
+                }                                                              
                    \
+            }                                                                  
                    \
+            POST_PROCESS;                                                      
                    \
+        }                                                                      
                    \
+    }
+#endif
         // 1. serialize depends on it is not local exchange
         // 2. send block
         // 3. rollover block
@@ -553,45 +552,11 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
         } else if (_enable_pipeline_exec) {
             BroadcastPBlockHolder* block_holder = nullptr;
             RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
-            {
-                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                RETURN_IF_ERROR(
-                        serialize_block(block, block_holder->get_block(), 
_channels.size()));
-            }
-
-            Status status;
-            for (auto channel : _channels) {
-                if (!channel->is_receiver_eof()) {
-                    if (channel->is_local()) {
-                        status = channel->send_local_block(block);
-                    } else {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                        status = channel->send_block(block_holder, eos);
-                    }
-                    HANDLE_CHANNEL_STATUS(state, channel, status);
-                }
-            }
+            BROADCAST_ALL_CHANNELS(block_holder->get_block(), block_holder, );
         } else {
-            {
-                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, 
_channels.size()));
-            }
-
-            Status status;
-            for (auto channel : _channels) {
-                if (!channel->is_receiver_eof()) {
-                    if (channel->is_local()) {
-                        status = channel->send_local_block(block);
-                    } else {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                        status = channel->send_block(_cur_pb_block, eos);
-                    }
-                    HANDLE_CHANNEL_STATUS(state, channel, status);
-                }
-            }
-            // rollover
-            _roll_pb_block();
+            BROADCAST_ALL_CHANNELS(_cur_pb_block, _cur_pb_block, 
_roll_pb_block());
         }
+#undef BROADCAST_ALL_CHANNELS
     } else if (_part_type == TPartitionType::RANDOM) {
         // 1. select channel
         Channel* current_channel = _channels[_current_channel_idx];
@@ -602,7 +567,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                RETURN_IF_ERROR(serialize_block(block, 
current_channel->ch_cur_pb_block()));
+                RETURN_IF_ERROR(
+                        _serializer->serialize_block(block, 
current_channel->ch_cur_pb_block()));
                 auto status = 
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
                 current_channel->ch_roll_pb_block();
@@ -676,6 +642,28 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
 }
 
 Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
+    if (_serializer->get_block() && _serializer->get_block()->rows() > 0) {
+        BroadcastPBlockHolder* block_holder = nullptr;
+        RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
+        {
+            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+            auto block = _serializer->get_block()->to_block();
+            RETURN_IF_ERROR(_serializer->serialize_block(&block, 
block_holder->get_block(),
+                                                         _channels.size()));
+            Status status;
+            for (auto channel : _channels) {
+                if (!channel->is_receiver_eof()) {
+                    if (channel->is_local()) {
+                        status = channel->send_local_block(block);
+                    } else {
+                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                        status = channel->send_block(block_holder, false);
+                    }
+                    HANDLE_CHANNEL_STATUS(state, channel, status);
+                }
+            }
+        }
+    }
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
         Status st = _channels[i]->close(state);
@@ -693,6 +681,27 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
 
     Status final_st = Status::OK();
     if (!state->enable_pipeline_exec()) {
+        {
+            // send last block
+            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+            if (_serializer->get_block() && _serializer->get_block()->rows() > 
0) {
+                auto block = _serializer->get_block()->to_block();
+                RETURN_IF_ERROR(
+                        _serializer->serialize_block(&block, _cur_pb_block, 
_channels.size()));
+                Status status;
+                for (auto channel : _channels) {
+                    if (!channel->is_receiver_eof()) {
+                        if (channel->is_local()) {
+                            status = channel->send_local_block(block);
+                        } else {
+                            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                            status = channel->send_block(_cur_pb_block, false);
+                        }
+                        HANDLE_CHANNEL_STATUS(state, channel, status);
+                    }
+                }
+            }
+        }
         for (int i = 0; i < _channels.size(); ++i) {
             Status st = _channels[i]->close(state);
             if (!st.ok() && final_st.ok()) {
@@ -712,17 +721,61 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
     return final_st;
 }
 
-Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int 
num_receivers) {
+BlockSerializer::BlockSerializer(VDataStreamSender* parent, bool is_local)
+        : _parent(parent), _is_local(is_local), 
_batch_size(parent->state()->batch_size()) {}
+
+Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int 
num_receivers,
+                                              bool* serialized, const 
std::vector<int>* rows) {
+    if (_mutable_block == nullptr) {
+        SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
+        _mutable_block = MutableBlock::create_unique(block->clone_empty());
+    }
+
+    {
+        SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
+        if (rows) {
+            SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
+            const int* begin = &(*rows)[0];
+            _mutable_block->add_rows(block, begin, begin + rows->size());
+        } else {
+            SCOPED_TIMER(_parent->_merge_block_timer);
+            RETURN_IF_ERROR(_mutable_block->merge(*block));
+        }
+    }
+
+    if (_mutable_block->rows() >= _batch_size) {
+        if (!_is_local) {
+            RETURN_IF_ERROR(serialize_block(dest, num_receivers));
+        }
+        *serialized = true;
+        return Status::OK();
+    }
+    *serialized = false;
+    return Status::OK();
+}
+
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
+    if (_mutable_block && _mutable_block->rows() > 0) {
+        auto block = _mutable_block->to_block();
+        RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
+        block.clear_column_data();
+        _mutable_block->set_muatable_columns(block.mutate_columns());
+    }
+
+    return Status::OK();
+}
+
+Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int 
num_receivers) {
     {
-        SCOPED_TIMER(_serialize_batch_timer);
+        SCOPED_TIMER(_parent->_serialize_batch_timer);
         dest->Clear();
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, 
&uncompressed_bytes,
-                                       &compressed_bytes, _compression_type,
-                                       _transfer_large_data_by_brpc));
-        COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
-        COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
-        COUNTER_UPDATE(_compress_timer, src->get_compress_time());
+        RETURN_IF_ERROR(src->serialize(
+                _parent->_state->be_exec_version(), dest, &uncompressed_bytes, 
&compressed_bytes,
+                _parent->_compression_type, 
_parent->_transfer_large_data_by_brpc));
+        COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
+        COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, 
uncompressed_bytes * num_receivers);
+        COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
     }
 
     return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 4dbe2625d9..e77518df2c 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -67,6 +67,27 @@ class ExchangeSinkOperator;
 
 namespace vectorized {
 class Channel;
+class VDataStreamSender;
+
+class BlockSerializer {
+public:
+    BlockSerializer(VDataStreamSender* parent, bool is_local = false);
+    Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, 
bool* serialized,
+                                 const std::vector<int>* rows = nullptr);
+    Status serialize_block(PBlock* dest, int num_receivers = 1);
+    Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
+
+    MutableBlock* get_block() const { return _mutable_block.get(); }
+
+    void reset_block() { _mutable_block.reset(); }
+
+private:
+    VDataStreamSender* _parent;
+    std::unique_ptr<MutableBlock> _mutable_block;
+
+    bool _is_local;
+    const int _batch_size;
+};
 
 class VDataStreamSender : public DataSink {
 public:
@@ -98,8 +119,6 @@ public:
 
     RuntimeState* state() { return _state; }
 
-    Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
-
     void registe_channels(pipeline::ExchangeSinkBuffer* buffer);
 
     bool channel_all_can_write();
@@ -110,6 +129,7 @@ protected:
     friend class Channel;
     friend class PipChannel;
     friend class pipeline::ExchangeSinkBuffer;
+    friend class BlockSerializer;
 
     void _roll_pb_block();
     Status _get_next_available_buffer(BroadcastPBlockHolder** holder);
@@ -129,15 +149,6 @@ protected:
     template <typename ChannelPtrType>
     void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st);
 
-    struct hash_128 {
-        uint64_t high;
-        uint64_t low;
-    };
-
-    using hash_128_t = hash_128;
-
-    Status handle_unpartitioned(Block* block);
-
     // Sender instance id, unique within a fragment.
     int _sender_id;
 
@@ -148,7 +159,6 @@ protected:
     int _current_channel_idx; // index of current channel to send to if 
_random == true
 
     TPartitionType::type _part_type;
-    bool _ignore_not_found;
 
     // serialized batches for broadcasting; we need two so we can write
     // one while the other one is still being sent
@@ -173,12 +183,12 @@ protected:
     RuntimeProfile::Counter* _brpc_wait_timer;
     RuntimeProfile::Counter* _bytes_sent_counter;
     RuntimeProfile::Counter* _uncompressed_bytes_counter;
-    RuntimeProfile::Counter* _ignore_rows;
     RuntimeProfile::Counter* _local_sent_rows;
     RuntimeProfile::Counter* _local_send_timer;
     RuntimeProfile::Counter* _split_block_hash_compute_timer;
     RuntimeProfile::Counter* _split_block_distribute_by_channel_timer;
     RuntimeProfile::Counter* _blocks_sent_counter;
+    RuntimeProfile::Counter* _merge_block_timer;
 
     std::unique_ptr<MemTracker> _mem_tracker;
 
@@ -196,6 +206,8 @@ protected:
 
     bool _only_local_exchange = false;
     bool _enable_pipeline_exec = false;
+
+    std::unique_ptr<BlockSerializer> _serializer;
 };
 
 class Channel {
@@ -253,7 +265,7 @@ public:
         return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
     }
 
-    Status add_rows(Block* block, const std::vector<int>& row);
+    virtual Status add_rows(Block* block, const std::vector<int>& row);
 
     virtual Status send_current_block(bool eos);
 
@@ -342,9 +354,6 @@ protected:
     int64_t _num_data_bytes_sent;
     int64_t _packet_seq;
 
-    // we're accumulating rows into this batch
-    std::unique_ptr<MutableBlock> _mutable_block;
-
     bool _need_close;
     bool _closed;
     int _be_number;
@@ -373,6 +382,8 @@ protected:
     PBlock* _ch_cur_pb_block = nullptr;
     PBlock _ch_pb_block1;
     PBlock _ch_pb_block2;
+
+    std::unique_ptr<BlockSerializer> _serializer;
 };
 
 #define HANDLE_CHANNEL_STATUS(state, channel, status)    \
@@ -469,20 +480,33 @@ public:
         return Status::OK();
     }
 
+    Status add_rows(Block* block, const std::vector<int>& rows) override {
+        if (_fragment_instance_id.lo == -1) {
+            return Status::OK();
+        }
+
+        bool serialized = false;
+        _pblock = std::make_unique<PBlock>();
+        RETURN_IF_ERROR(
+                _serializer->next_serialized_block(block, _pblock.get(), 1, 
&serialized, &rows));
+        if (serialized) {
+            RETURN_IF_ERROR(send_current_block(false));
+        }
+
+        return Status::OK();
+    }
+
     // send _mutable_block
     Status send_current_block(bool eos) override {
         if (is_local()) {
             return send_local_block(eos);
         }
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
-        auto block_ptr = std::make_unique<PBlock>();
-        if (_mutable_block) {
-            auto block = _mutable_block->to_block();
-            RETURN_IF_ERROR(_parent->serialize_block(&block, block_ptr.get()));
-            block.clear_column_data();
-            _mutable_block->set_muatable_columns(block.mutate_columns());
+        if (eos) {
+            _pblock = std::make_unique<PBlock>();
+            RETURN_IF_ERROR(_serializer->serialize_block(_pblock.get(), 1));
         }
-        RETURN_IF_ERROR(send_block(block_ptr.release(), eos));
+        RETURN_IF_ERROR(send_block(_pblock.release(), eos));
         return Status::OK();
     }
 
@@ -508,6 +532,7 @@ private:
     pipeline::ExchangeSinkBuffer* _buffer = nullptr;
     bool _eos_send = false;
     std::unique_ptr<pipeline::SelfDeleteClosure<PTransmitDataResult>> _closure 
= nullptr;
+    std::unique_ptr<PBlock> _pblock;
 };
 
 } // namespace vectorized
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp 
b/be/test/vec/runtime/vdata_stream_test.cpp
index 954bf8f194..6c10b89aa7 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -208,14 +208,15 @@ TEST_F(VDataStreamTest, BasicTest) {
     vectorized::Block block({type_and_name});
     sender.send(&runtime_stat, &block);
 
+    Status exec_status;
+    sender.close(&runtime_stat, exec_status);
+
     Block block_2;
     bool eos;
     recv->get_next(&block_2, &eos);
 
     EXPECT_EQ(block_2.rows(), 1024);
 
-    Status exec_status;
-    sender.close(&runtime_stat, exec_status);
     recv->close();
 }
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to