This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 601120db04 [Bug](pipeline) access map may cause coredump in sink buffer (#21108) 601120db04 is described below commit 601120db04325547cced54e12d38c6994755434d Author: HappenLee <happen...@hotmail.com> AuthorDate: Sat Jun 24 23:03:59 2023 +0800 [Bug](pipeline) access map may cause coredump in sink buffer (#21108) --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 25 ++++++++----------------- be/src/pipeline/exec/exchange_sink_buffer.h | 6 +++--- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index c0f0e921e9..e8b3f76fda 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -59,11 +59,8 @@ ExchangeSinkBuffer::~ExchangeSinkBuffer() = default; void ExchangeSinkBuffer::close() { for (const auto& pair : _instance_to_request) { - if (pair.second) { - pair.second->release_finst_id(); - pair.second->release_query_id(); - delete pair.second; - } + pair.second->release_finst_id(); + pair.second->release_query_id(); } _instance_to_request.clear(); } @@ -104,10 +101,10 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); - _instance_to_finst_id[low_id] = finst_id; _instance_to_sending_by_pipeline[low_id] = true; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; + _construct_request(low_id, finst_id); } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { @@ -173,10 +170,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (!q.empty()) { // If we have data to shuffle which is not broadcasted auto& request = q.front(); - if (!_instance_to_request[id]) { - _construct_request(id); - } - auto brpc_request = _instance_to_request[id]; + auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); if (request.block) { @@ -220,10 +214,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted auto& request = broadcast_q.front(); - if (!_instance_to_request[id]) { - _construct_request(id); - } - auto brpc_request = _instance_to_request[id]; + auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); if (request.block_holder->get_block()) { @@ -272,9 +263,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { return Status::OK(); } -void ExchangeSinkBuffer::_construct_request(InstanceLoId id) { - _instance_to_request[id] = new PTransmitDataParams(); - _instance_to_request[id]->set_allocated_finst_id(&_instance_to_finst_id[id]); +void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id) { + _instance_to_request[id] = std::make_unique<PTransmitDataParams>(); + _instance_to_request[id]->mutable_finst_id()->CopyFrom(finst_id); _instance_to_request[id]->set_allocated_query_id(&_query_id); _instance_to_request[id]->set_node_id(_dest_node_id); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index ccc5e2afff..dcea246f91 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -179,9 +179,9 @@ private: _instance_to_broadcast_package_queue; using PackageSeq = int64_t; // must init zero + // TODO: make all flat_hash_map to a STRUT phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq; - phmap::flat_hash_map<InstanceLoId, PTransmitDataParams*> _instance_to_request; - phmap::flat_hash_map<InstanceLoId, PUniqueId> _instance_to_finst_id; + phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> _instance_to_request; phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline; phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof; phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time; @@ -197,7 +197,7 @@ private: Status _send_rpc(InstanceLoId); // must hold the _instance_to_package_queue_mutex[id] mutex to opera - void _construct_request(InstanceLoId id); + void _construct_request(InstanceLoId id, PUniqueId); inline void _ended(InstanceLoId id); inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org