This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d585a8acc1 [Improvement](shuffle) Accumulate rows in a batch for shuffling (#22218) d585a8acc1 is described below commit d585a8acc18e4ceb4ab9e625b9721c80124d27f7 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Aug 1 09:55:06 2023 +0800 [Improvement](shuffle) Accumulate rows in a batch for shuffling (#22218) --- be/src/vec/sink/vdata_stream_sender.cpp | 237 ++++++++++++++++++------------ be/src/vec/sink/vdata_stream_sender.h | 73 ++++++--- be/test/vec/runtime/vdata_stream_test.cpp | 5 +- 3 files changed, 197 insertions(+), 118 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index dec3b43f09..d1c768a5e6 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,6 +95,8 @@ Status Channel::init(RuntimeState* state) { _fragment_instance_id, _dest_node_id); } + _serializer.reset(new BlockSerializer(_parent, _is_local)); + // In bucket shuffle join will set fragment_instance_id (-1, -1) // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() @@ -110,10 +112,9 @@ Status Channel::send_current_block(bool eos) { return send_local_block(eos); } SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - auto block = _mutable_block->to_block(); - RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block)); - block.clear_column_data(); - _mutable_block->set_muatable_columns(block.mutate_columns()); + if (eos) { + RETURN_IF_ERROR(_serializer->serialize_block(_ch_cur_pb_block, 1)); + } RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); ch_roll_pb_block(); return Status::OK(); @@ -121,8 +122,8 @@ Status Channel::send_current_block(bool eos) { Status Channel::send_local_block(bool eos) { SCOPED_TIMER(_parent->_local_send_timer); - Block block = _mutable_block->to_block(); - _mutable_block->set_muatable_columns(block.clone_empty_columns()); + Block block = _serializer->get_block()->to_block(); + _serializer->get_block()->set_muatable_columns(block.clone_empty_columns()); if (_recvr_is_valid()) { COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes()); COUNTER_UPDATE(_parent->_local_sent_rows, block.rows()); @@ -133,7 +134,7 @@ Status Channel::send_local_block(bool eos) { } return Status::OK(); } else { - _mutable_block.reset(); + _serializer->reset_block(); return _receiver_status; } } @@ -202,36 +203,11 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) { return Status::OK(); } - if (_mutable_block == nullptr) { - SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - _mutable_block = MutableBlock::create_unique(block->clone_empty()); - } - - int row_wait_add = rows.size(); - int batch_size = _parent->state()->batch_size(); - const int* begin = &rows[0]; - - while (row_wait_add > 0) { - int row_add = 0; - int max_add = batch_size - _mutable_block->rows(); - if (row_wait_add >= max_add) { - row_add = max_add; - } else { - row_add = row_wait_add; - } - - { - SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer); - _mutable_block->add_rows(block, begin, begin + row_add); - } - - row_wait_add -= row_add; - begin += row_add; - - if (row_add == max_add) { - RETURN_IF_ERROR(send_current_block(false)); - } + bool serialized = false; + RETURN_IF_ERROR( + _serializer->next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, &rows)); + if (serialized) { + RETURN_IF_ERROR(send_current_block(false)); } return Status::OK(); @@ -248,7 +224,7 @@ Status Channel::close_wait(RuntimeState* state) { _need_close = false; return st; } - _mutable_block.reset(); + _serializer->reset_block(); return Status::OK(); } @@ -257,15 +233,15 @@ Status Channel::close_internal() { return Status::OK(); } VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id - << " dest_node=" << _dest_node_id - << " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows()) + << " dest_node=" << _dest_node_id << " #rows= " + << ((_serializer->get_block() == nullptr) ? 0 : _serializer->get_block()->rows()) << " receiver status: " << _receiver_status; if (is_receiver_eof()) { - _mutable_block.reset(); + _serializer->reset_block(); return Status::OK(); } Status status; - if (_mutable_block != nullptr && _mutable_block->rows() > 0) { + if (_serializer->get_block() != nullptr && _serializer->get_block()->rows() > 0) { status = send_current_block(true); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); @@ -312,7 +288,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _row_desc(row_desc), _current_channel_idx(0), _part_type(sink.output_partition.type), - _ignore_not_found(sink.__isset.ignore_not_found ? sink.ignore_not_found : true), _profile(nullptr), _serialize_batch_timer(nullptr), _bytes_sent_counter(nullptr), @@ -377,7 +352,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _row_desc(row_desc), _current_channel_idx(0), _part_type(TPartitionType::UNPARTITIONED), - _ignore_not_found(true), _profile(nullptr), _serialize_batch_timer(nullptr), _compress_timer(nullptr), @@ -415,7 +389,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_ _pool(pool), _row_desc(row_desc), _current_channel_idx(0), - _ignore_not_found(true), _profile(nullptr), _serialize_batch_timer(nullptr), _compress_timer(nullptr), @@ -479,9 +452,10 @@ Status VDataStreamSender::prepare(RuntimeState* state) { RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _row_desc)); } + _serializer.reset(new BlockSerializer(this)); + _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); - _ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT); _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT); _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); _compress_timer = ADD_TIMER(profile(), "CompressTime"); @@ -491,6 +465,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) { _split_block_hash_compute_timer = ADD_TIMER(profile(), "SplitBlockHashComputeTime"); _split_block_distribute_by_channel_timer = ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime"); + _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime"); _blocks_sent_counter = ADD_COUNTER(profile(), "BlocksSent", TUnit::UNIT); _overall_throughput = profile()->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, @@ -539,6 +514,30 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { +#ifndef BROADCAST_ALL_CHANNELS +#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \ + { \ + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ + bool serialized = false; \ + RETURN_IF_ERROR( \ + _serializer->next_serialized_block(block, PBLOCK, _channels.size(), &serialized)); \ + if (serialized) { \ + Status status; \ + for (auto channel : _channels) { \ + if (!channel->is_receiver_eof()) { \ + if (channel->is_local()) { \ + status = channel->send_local_block(block); \ + } else { \ + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ + status = channel->send_block(PBLOCK_TO_SEND, false); \ + } \ + HANDLE_CHANNEL_STATUS(state, channel, status); \ + } \ + } \ + POST_PROCESS; \ + } \ + } +#endif // 1. serialize depends on it is not local exchange // 2. send block // 3. rollover block @@ -553,45 +552,11 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } else if (_enable_pipeline_exec) { BroadcastPBlockHolder* block_holder = nullptr; RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); - { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - RETURN_IF_ERROR( - serialize_block(block, block_holder->get_block(), _channels.size())); - } - - Status status; - for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - if (channel->is_local()) { - status = channel->send_local_block(block); - } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, eos); - } - HANDLE_CHANNEL_STATUS(state, channel, status); - } - } + BROADCAST_ALL_CHANNELS(block_holder->get_block(), block_holder, ); } else { - { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size())); - } - - Status status; - for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - if (channel->is_local()) { - status = channel->send_local_block(block); - } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(_cur_pb_block, eos); - } - HANDLE_CHANNEL_STATUS(state, channel, status); - } - } - // rollover - _roll_pb_block(); + BROADCAST_ALL_CHANNELS(_cur_pb_block, _cur_pb_block, _roll_pb_block()); } +#undef BROADCAST_ALL_CHANNELS } else if (_part_type == TPartitionType::RANDOM) { // 1. select channel Channel* current_channel = _channels[_current_channel_idx]; @@ -602,7 +567,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block())); + RETURN_IF_ERROR( + _serializer->serialize_block(block, current_channel->ch_cur_pb_block())); auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); @@ -676,6 +642,28 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { + if (_serializer->get_block() && _serializer->get_block()->rows() > 0) { + BroadcastPBlockHolder* block_holder = nullptr; + RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + auto block = _serializer->get_block()->to_block(); + RETURN_IF_ERROR(_serializer->serialize_block(&block, block_holder->get_block(), + _channels.size())); + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + if (channel->is_local()) { + status = channel->send_local_block(block); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + status = channel->send_block(block_holder, false); + } + HANDLE_CHANNEL_STATUS(state, channel, status); + } + } + } + } Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { Status st = _channels[i]->close(state); @@ -693,6 +681,27 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { Status final_st = Status::OK(); if (!state->enable_pipeline_exec()) { + { + // send last block + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (_serializer->get_block() && _serializer->get_block()->rows() > 0) { + auto block = _serializer->get_block()->to_block(); + RETURN_IF_ERROR( + _serializer->serialize_block(&block, _cur_pb_block, _channels.size())); + Status status; + for (auto channel : _channels) { + if (!channel->is_receiver_eof()) { + if (channel->is_local()) { + status = channel->send_local_block(block); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + status = channel->send_block(_cur_pb_block, false); + } + HANDLE_CHANNEL_STATUS(state, channel, status); + } + } + } + } for (int i = 0; i < _channels.size(); ++i) { Status st = _channels[i]->close(state); if (!st.ok() && final_st.ok()) { @@ -712,17 +721,61 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { return final_st; } -Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_receivers) { +BlockSerializer::BlockSerializer(VDataStreamSender* parent, bool is_local) + : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} + +Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int num_receivers, + bool* serialized, const std::vector<int>* rows) { + if (_mutable_block == nullptr) { + SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); + _mutable_block = MutableBlock::create_unique(block->clone_empty()); + } + + { + SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); + if (rows) { + SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer); + const int* begin = &(*rows)[0]; + _mutable_block->add_rows(block, begin, begin + rows->size()); + } else { + SCOPED_TIMER(_parent->_merge_block_timer); + RETURN_IF_ERROR(_mutable_block->merge(*block)); + } + } + + if (_mutable_block->rows() >= _batch_size) { + if (!_is_local) { + RETURN_IF_ERROR(serialize_block(dest, num_receivers)); + } + *serialized = true; + return Status::OK(); + } + *serialized = false; + return Status::OK(); +} + +Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) { + if (_mutable_block && _mutable_block->rows() > 0) { + auto block = _mutable_block->to_block(); + RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); + block.clear_column_data(); + _mutable_block->set_muatable_columns(block.mutate_columns()); + } + + return Status::OK(); +} + +Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int num_receivers) { { - SCOPED_TIMER(_serialize_batch_timer); + SCOPED_TIMER(_parent->_serialize_batch_timer); dest->Clear(); size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, &uncompressed_bytes, - &compressed_bytes, _compression_type, - _transfer_large_data_by_brpc)); - COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers); - COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); - COUNTER_UPDATE(_compress_timer, src->get_compress_time()); + RETURN_IF_ERROR(src->serialize( + _parent->_state->be_exec_version(), dest, &uncompressed_bytes, &compressed_bytes, + _parent->_compression_type, _parent->_transfer_large_data_by_brpc)); + COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); + COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); + COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time()); } return Status::OK(); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 4dbe2625d9..e77518df2c 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -67,6 +67,27 @@ class ExchangeSinkOperator; namespace vectorized { class Channel; +class VDataStreamSender; + +class BlockSerializer { +public: + BlockSerializer(VDataStreamSender* parent, bool is_local = false); + Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, + const std::vector<int>* rows = nullptr); + Status serialize_block(PBlock* dest, int num_receivers = 1); + Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); + + MutableBlock* get_block() const { return _mutable_block.get(); } + + void reset_block() { _mutable_block.reset(); } + +private: + VDataStreamSender* _parent; + std::unique_ptr<MutableBlock> _mutable_block; + + bool _is_local; + const int _batch_size; +}; class VDataStreamSender : public DataSink { public: @@ -98,8 +119,6 @@ public: RuntimeState* state() { return _state; } - Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); - void registe_channels(pipeline::ExchangeSinkBuffer* buffer); bool channel_all_can_write(); @@ -110,6 +129,7 @@ protected: friend class Channel; friend class PipChannel; friend class pipeline::ExchangeSinkBuffer; + friend class BlockSerializer; void _roll_pb_block(); Status _get_next_available_buffer(BroadcastPBlockHolder** holder); @@ -129,15 +149,6 @@ protected: template <typename ChannelPtrType> void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); - struct hash_128 { - uint64_t high; - uint64_t low; - }; - - using hash_128_t = hash_128; - - Status handle_unpartitioned(Block* block); - // Sender instance id, unique within a fragment. int _sender_id; @@ -148,7 +159,6 @@ protected: int _current_channel_idx; // index of current channel to send to if _random == true TPartitionType::type _part_type; - bool _ignore_not_found; // serialized batches for broadcasting; we need two so we can write // one while the other one is still being sent @@ -173,12 +183,12 @@ protected: RuntimeProfile::Counter* _brpc_wait_timer; RuntimeProfile::Counter* _bytes_sent_counter; RuntimeProfile::Counter* _uncompressed_bytes_counter; - RuntimeProfile::Counter* _ignore_rows; RuntimeProfile::Counter* _local_sent_rows; RuntimeProfile::Counter* _local_send_timer; RuntimeProfile::Counter* _split_block_hash_compute_timer; RuntimeProfile::Counter* _split_block_distribute_by_channel_timer; RuntimeProfile::Counter* _blocks_sent_counter; + RuntimeProfile::Counter* _merge_block_timer; std::unique_ptr<MemTracker> _mem_tracker; @@ -196,6 +206,8 @@ protected: bool _only_local_exchange = false; bool _enable_pipeline_exec = false; + + std::unique_ptr<BlockSerializer> _serializer; }; class Channel { @@ -253,7 +265,7 @@ public: return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } - Status add_rows(Block* block, const std::vector<int>& row); + virtual Status add_rows(Block* block, const std::vector<int>& row); virtual Status send_current_block(bool eos); @@ -342,9 +354,6 @@ protected: int64_t _num_data_bytes_sent; int64_t _packet_seq; - // we're accumulating rows into this batch - std::unique_ptr<MutableBlock> _mutable_block; - bool _need_close; bool _closed; int _be_number; @@ -373,6 +382,8 @@ protected: PBlock* _ch_cur_pb_block = nullptr; PBlock _ch_pb_block1; PBlock _ch_pb_block2; + + std::unique_ptr<BlockSerializer> _serializer; }; #define HANDLE_CHANNEL_STATUS(state, channel, status) \ @@ -469,20 +480,33 @@ public: return Status::OK(); } + Status add_rows(Block* block, const std::vector<int>& rows) override { + if (_fragment_instance_id.lo == -1) { + return Status::OK(); + } + + bool serialized = false; + _pblock = std::make_unique<PBlock>(); + RETURN_IF_ERROR( + _serializer->next_serialized_block(block, _pblock.get(), 1, &serialized, &rows)); + if (serialized) { + RETURN_IF_ERROR(send_current_block(false)); + } + + return Status::OK(); + } + // send _mutable_block Status send_current_block(bool eos) override { if (is_local()) { return send_local_block(eos); } SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - auto block_ptr = std::make_unique<PBlock>(); - if (_mutable_block) { - auto block = _mutable_block->to_block(); - RETURN_IF_ERROR(_parent->serialize_block(&block, block_ptr.get())); - block.clear_column_data(); - _mutable_block->set_muatable_columns(block.mutate_columns()); + if (eos) { + _pblock = std::make_unique<PBlock>(); + RETURN_IF_ERROR(_serializer->serialize_block(_pblock.get(), 1)); } - RETURN_IF_ERROR(send_block(block_ptr.release(), eos)); + RETURN_IF_ERROR(send_block(_pblock.release(), eos)); return Status::OK(); } @@ -508,6 +532,7 @@ private: pipeline::ExchangeSinkBuffer* _buffer = nullptr; bool _eos_send = false; std::unique_ptr<pipeline::SelfDeleteClosure<PTransmitDataResult>> _closure = nullptr; + std::unique_ptr<PBlock> _pblock; }; } // namespace vectorized diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index 954bf8f194..6c10b89aa7 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -208,14 +208,15 @@ TEST_F(VDataStreamTest, BasicTest) { vectorized::Block block({type_and_name}); sender.send(&runtime_stat, &block); + Status exec_status; + sender.close(&runtime_stat, exec_status); + Block block_2; bool eos; recv->get_next(&block_2, &eos); EXPECT_EQ(block_2.rows(), 1024); - Status exec_status; - sender.close(&runtime_stat, exec_status); recv->close(); } } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org