This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a038fdaec6 [Bug](pipeline) Fix bug in non-local exchange on pipeline engine (#16463) a038fdaec6 is described below commit a038fdaec642ff6a42a2c776d8d76076138efd5b Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Feb 9 19:22:40 2023 +0800 [Bug](pipeline) Fix bug in non-local exchange on pipeline engine (#16463) Currently, for broadcast shuffle, we serialize a block once and then send it by RPC through multiple channel. After this, we will serialize next block in the same memory for consideration of memory reuse. However, since the RPC is asynchronized, maybe the next block serialization will happen before sending the previous block. So, in this PR, I use a ref count to identify if the serialized block can be reuse in broadcast shuffle. --- be/src/common/config.h | 2 + be/src/pipeline/exec/exchange_sink_buffer.cpp | 127 +++++++++++++++++--------- be/src/pipeline/exec/exchange_sink_buffer.h | 17 +++- be/src/vec/sink/vdata_stream_sender.cpp | 48 +++++++++- be/src/vec/sink/vdata_stream_sender.h | 62 ++++++++++++- 5 files changed, 207 insertions(+), 49 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index de719094bd..b823aff817 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -896,6 +896,8 @@ CONF_Int32(query_bkd_inverted_index_limit_percent, "5"); // 5% CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict"); // tree depth for bkd index CONF_Int32(max_depth_in_bkd_tree, "32"); +// use num_broadcast_buffer blocks as buffer to do broadcast +CONF_Int32(num_broadcast_buffer, "32"); #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index aedeeec19b..36f5aab77c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -32,7 +32,8 @@ namespace doris::pipeline { template <typename T> class SelfDeleteClosure : public google::protobuf::Closure { public: - SelfDeleteClosure(InstanceLoId id, bool eos) : _id(id), _eos(eos) {} + SelfDeleteClosure(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data = nullptr) + : _id(id), _eos(eos), _data(data) {} ~SelfDeleteClosure() override = default; SelfDeleteClosure(const SelfDeleteClosure& other) = delete; SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; @@ -56,6 +57,9 @@ public: } else { _suc_fn(_id, _eos, result); } + if (_data) { + _data->unref(); + } } catch (const std::exception& exp) { LOG(FATAL) << "brpc callback error: " << exp.what(); } catch (...) { @@ -71,6 +75,7 @@ private: std::function<void(const InstanceLoId&, const bool&, const T&)> _suc_fn; InstanceLoId _id; bool _eos; + vectorized::BroadcastPBlockHolder* _data; }; ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, @@ -126,6 +131,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>(); _instance_to_seq[low_id] = 0; _instance_to_package_queue[low_id] = std::queue<TransmitInfo, std::list<TransmitInfo>>(); + _instance_to_broadcast_package_queue[low_id] = + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); @@ -155,58 +162,92 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } +Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { + if (_is_finishing) { + return Status::OK(); + } + TUniqueId ins_id = request.channel->_fragment_instance_id; + bool send_now = false; + request.block_holder->ref(); + { + std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]); + // Do not have in process rpc, directly send + if (_instance_to_sending_by_pipeline[ins_id.lo]) { + send_now = true; + _instance_to_sending_by_pipeline[ins_id.lo] = false; + } + _instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request)); + } + if (send_now) { + RETURN_IF_ERROR(_send_rpc(ins_id.lo)); + } + + return Status::OK(); +} + Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; - if (q.empty() || _is_finishing) { + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + + if (_is_finishing) { _instance_to_sending_by_pipeline[id] = true; return Status::OK(); } - TransmitInfo& request = q.front(); - - if (!_instance_to_request[id]) { - _construct_request(id); - } - - auto brpc_request = _instance_to_request[id]; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); - if (request.block) { - brpc_request->set_allocated_block(request.block.get()); - } +#define DO_RPC(QUEUE, BLOCK, HOLDER) \ + auto& request = QUEUE.front(); \ + if (!_instance_to_request[id]) { \ + _construct_request(id); \ + } \ + auto brpc_request = _instance_to_request[id]; \ + brpc_request->set_eos(request.eos); \ + brpc_request->set_packet_seq(_instance_to_seq[id]++); \ + if (request.BLOCK) { \ + brpc_request->set_allocated_block(request.BLOCK); \ + } \ + auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos, HOLDER); \ + _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); \ + _closure->addFailedHandler( \ + [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); \ + _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, \ + const PTransmitDataResult& result) { \ + Status s = Status(result.status()); \ + if (!s.ok()) { \ + _failed(id, \ + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); \ + } else if (eos) { \ + _ended(id); \ + } else { \ + _send_rpc(id); \ + } \ + }); \ + { \ + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); \ + if (enable_http_send_block(*brpc_request)) { \ + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, \ + *brpc_request, request.channel->_brpc_dest_addr)); \ + } else { \ + transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); \ + } \ + } \ + if (request.BLOCK) { \ + brpc_request->release_block(); \ + } \ + QUEUE.pop(); - auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos); - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); - _closure->addFailedHandler( - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result) { - Status s = Status(result.status()); - if (!s.ok()) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); - } else if (eos) { - _ended(id); - } else { - _send_rpc(id); - } - }); - - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, - *brpc_request, request.channel->_brpc_dest_addr)); - } else { - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); - } - } - - if (request.block) { - brpc_request->release_block(); + if (!q.empty()) { + // If we have data to shuffle which is not broadcasted + DO_RPC(q, block, nullptr) + } else if (!broadcast_q.empty()) { + // If we have data to shuffle which is broadcasted + DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder) + } else { + _instance_to_sending_by_pipeline[id] = true; + return Status::OK(); } - q.pop(); return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index eff77c05bb..7be17706ce 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -31,13 +31,20 @@ namespace doris { namespace vectorized { class PipChannel; -} +class BroadcastPBlockHolder; +} // namespace vectorized namespace pipeline { using InstanceLoId = int64_t; struct TransmitInfo { vectorized::PipChannel* channel; - std::unique_ptr<PBlock> block; + PBlock* block; + bool eos; +}; + +struct BroadcastTransmitInfo { + vectorized::PipChannel* channel; + vectorized::BroadcastPBlockHolder* block_holder; bool eos; }; @@ -50,6 +57,7 @@ public: ~ExchangeSinkBuffer(); void register_sink(TUniqueId); Status add_block(TransmitInfo&& request); + Status add_block(BroadcastTransmitInfo&& request); bool can_write() const; bool is_pending_finish() const; void close(); @@ -57,8 +65,13 @@ public: private: phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>> _instance_to_package_queue_mutex; + // store data in non-broadcast shuffle phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, std::list<TransmitInfo>>> _instance_to_package_queue; + // store data in broadcast shuffle + phmap::flat_hash_map<InstanceLoId, + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>> + _instance_to_broadcast_package_queue; using PackageSeq = int64_t; // must init zero phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index c44e8acd74..c7a9e62335 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -242,7 +242,7 @@ Status Channel::close_internal() { RETURN_IF_ERROR(send_current_block(true)); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - RETURN_IF_ERROR(send_block(nullptr, true)); + RETURN_IF_ERROR(send_block((PBlock*)nullptr, true)); } // Don't wait for the last packet to finish, left it to close_wait. return Status::OK(); @@ -287,7 +287,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); - _cur_pb_block = &_pb_block1; std::map<int64_t, int64_t> fragment_id_to_channel_index; @@ -317,6 +316,12 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } } _name = "VDataStreamSender"; + if (state->enable_pipeline_exec()) { + _broadcast_pb_blocks.resize(config::num_broadcast_buffer); + _broadcast_pb_block_idx = 0; + } else { + _cur_pb_block = &_pb_block1; + } } VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, @@ -470,6 +475,23 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { RETURN_IF_ERROR(channel->send_local_block(block)); } + } else if (state->enable_pipeline_exec()) { + BroadcastPBlockHolder* block_holder = nullptr; + RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR( + serialize_block(block, block_holder->get_block(), _channels.size())); + } + + for (auto channel : _channels) { + if (channel->is_local()) { + RETURN_IF_ERROR(channel->send_local_block(block)); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(channel->send_block(block_holder, eos)); + } + } } else { { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -620,6 +642,28 @@ void VDataStreamSender::_roll_pb_block() { _cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1); } +Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) { + constexpr int MAX_LOOP = 1000; + + size_t it = 0; + while (it < MAX_LOOP) { + if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { + _broadcast_pb_block_idx = 0; + } + + for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { + if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { + _broadcast_pb_block_idx++; + *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1]; + return Status::OK(); + } + } + it++; + } + return Status::InternalError( + "Exceed the max loop limit when acquire the next available buffer!"); +} + void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : _channels) { ((PipChannel*)channel)->registe(buffer); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index e644aaf847..19858c259d 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -49,6 +49,42 @@ namespace vectorized { class VExprContext; class Channel; +template <typename T> +struct AtomicWrapper { + std::atomic<T> _value; + + AtomicWrapper() : _value() {} + + AtomicWrapper(const std::atomic<T>& a) : _value(a.load()) {} + + AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {} + + AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); } +}; + +// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock +// will be shared between different channel, so we have to use a ref count to mark if this +// PBlock is available for next serialization. +class BroadcastPBlockHolder { +public: + BroadcastPBlockHolder() : _ref_count(0) {} + ~BroadcastPBlockHolder() noexcept = default; + + void unref() noexcept { + DCHECK_GT(_ref_count._value, 0); + _ref_count._value.fetch_sub(1); + } + void ref() noexcept { _ref_count._value.fetch_add(1); } + + bool available() { return _ref_count._value == 0; } + + PBlock* get_block() { return &pblock; } + +private: + AtomicWrapper<uint32_t> _ref_count; + PBlock pblock; +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -91,6 +127,7 @@ protected: friend class pipeline::ExchangeSinkBuffer; void _roll_pb_block(); + Status _get_next_available_buffer(BroadcastPBlockHolder** holder); Status get_partition_column_result(Block* block, int* result) const { int counter = 0; @@ -131,6 +168,10 @@ protected: PBlock _pb_block2; PBlock* _cur_pb_block; + // used by pipeline engine + std::vector<BroadcastPBlockHolder> _broadcast_pb_blocks; + int _broadcast_pb_block_idx; + // compute per-row partition values std::vector<VExprContext*> _partition_expr_ctxs; @@ -219,6 +260,10 @@ public: // if batch is nullptr, send the eof packet virtual Status send_block(PBlock* block, bool eos = false); + virtual Status send_block(BroadcastPBlockHolder* block, bool eos = false) { + return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); + } + Status add_rows(Block* block, const std::vector<int>& row); virtual Status send_current_block(bool eos); @@ -369,8 +414,21 @@ public: } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block( - {this, block ? std::make_unique<PBlock>(*block) : nullptr, eos})); + RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); + } + return Status::OK(); + } + + Status send_block(BroadcastPBlockHolder* block, bool eos = false) override { + if (eos) { + if (_eos_send) { + return Status::OK(); + } else { + _eos_send = true; + } + } + if (eos || block->get_block()->column_metas_size()) { + RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org