This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e5696f  [Bug] Fix the core in data_stream_recvr. Remove the map in 
DataStreamRecvr and replace by vector<pair> (#3928)
5e5696f is described below

commit 5e5696fda2c26bf1f1e0dcc6045b280a314f7fbf
Author: HappenLee <[email protected]>
AuthorDate: Thu Jun 25 16:29:07 2020 +0800

    [Bug] Fix the core in data_stream_recvr. Remove the map in DataStreamRecvr 
and replace by vector<pair> (#3928)
    
    Before we use a map in DataStreamRecvr to save the StopWatch corresponding 
to the pending closures.
    But we need to take care of the consistency between the map and pending 
closures queue, it is very error-prone.
    If it is not consistent, BE will crash.
    So we remove the map in DataStreamRecvr and replace by 
vector<pair<Closure*, MonotonicStopWatch>>.
---
 be/src/runtime/data_stream_recvr.cc | 24 ++++++++++--------------
 be/src/runtime/data_stream_recvr.h  |  1 -
 2 files changed, 10 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/data_stream_recvr.cc 
b/be/src/runtime/data_stream_recvr.cc
index d94d167..3261890 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -128,7 +128,7 @@ private:
     std::unordered_set<int> _sender_eos_set; // sender_id
     std::unordered_map<int, int64_t> _packet_seq_map; // be_number => 
packet_seq
 
-    std::deque<google::protobuf::Closure*> _pending_closures;
+    std::deque<std::pair<google::protobuf::Closure*, MonotonicStopWatch>> 
_pending_closures;
 };
 
 DataStreamRecvr::SenderQueue::SenderQueue(
@@ -176,14 +176,12 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** 
next_batch) {
     *next_batch = _current_batch.get();
 
     if (!_pending_closures.empty()) {
-        auto done = _pending_closures.front();
-        done->Run();
+        auto closure_pair = _pending_closures.front();
+        closure_pair.first->Run();
         _pending_closures.pop_front();
 
-        auto clock = _recvr->_closure_clock_map.at(done);
-        clock.stop();
-        _recvr->_buffer_full_total_timer->update(clock.elapsed_time());
-        _recvr->_closure_clock_map.erase(done);
+        closure_pair.second.stop();
+        
_recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time());
     }
 
     return Status::OK();
@@ -254,10 +252,8 @@ void DataStreamRecvr::SenderQueue::add_batch(
     if (done != nullptr && _recvr->exceeds_limit(batch_size)) {
         MonotonicStopWatch monotonicStopWatch;
         monotonicStopWatch.start();
-        _recvr->_closure_clock_map.insert(std::make_pair(*done, 
monotonicStopWatch));
-
         DCHECK(*done != nullptr);
-        _pending_closures.push_back(*done);
+        _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
     _recvr->_num_buffered_bytes += batch_size;
@@ -301,8 +297,8 @@ void DataStreamRecvr::SenderQueue::cancel() {
 
     {
         boost::lock_guard<boost::mutex> l(_lock);
-        for (auto done : _pending_closures) {
-            done->Run();
+        for (auto closure_pair : _pending_closures) {
+            closure_pair.first->Run();
         }
         _pending_closures.clear();
     }
@@ -316,8 +312,8 @@ void DataStreamRecvr::SenderQueue::close() {
         boost::lock_guard<boost::mutex> l(_lock);
         _is_cancelled = true;
 
-        for (auto done : _pending_closures) {
-            done->Run();
+        for (auto closure_pair : _pending_closures) {
+            closure_pair.first->Run();
         }
         _pending_closures.clear();
     }
diff --git a/be/src/runtime/data_stream_recvr.h 
b/be/src/runtime/data_stream_recvr.h
index 4a2aea6..104ee76 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -163,7 +163,6 @@ private:
     // receiver and placed in _sender_queue_pool.
     std::vector<SenderQueue*> _sender_queues;
 
-    std::map<google::protobuf::Closure*, MonotonicStopWatch> 
_closure_clock_map;
     // SortedRunMerger used to merge rows from different senders.
     boost::scoped_ptr<SortedRunMerger> _merger;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to