This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a34cc6ed23 [Refactor](exchange) Remove unless variable and change 
block mem count way (#16668)
a34cc6ed23 is described below

commit a34cc6ed2326836551beeebb94bd1d0873c4c10c
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Feb 13 19:14:01 2023 +0800

    [Refactor](exchange) Remove unless variable and change block mem count way 
(#16668)
---
 be/src/vec/runtime/vdata_stream_recvr.cpp | 20 +++++++-------------
 be/src/vec/runtime/vdata_stream_recvr.h   | 14 +++++---------
 2 files changed, 12 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a1e0ea4c32..79cb7b5cfd 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -82,9 +82,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* 
block, bool* eos)
     _received_first_batch = true;
 
     DCHECK(!_block_queue.empty());
-    BlockUPtr next_block = std::move(_block_queue.front());
-    auto block_byte_size = block->allocated_bytes();
-    _recvr->_num_buffered_bytes -= block_byte_size;
+    auto [next_block, block_byte_size] = std::move(_block_queue.front());
     _recvr->_blocks_memory_usage->add(-block_byte_size);
     _block_queue.pop_front();
     _update_block_queue_empty();
@@ -156,9 +154,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
     COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
     COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
     COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
-    _recvr->_blocks_memory_usage->add(block_byte_size);
 
-    _block_queue.emplace_back(std::move(block));
+    _block_queue.emplace_back(std::move(block), block_byte_size);
     _update_block_queue_empty();
     // if done is nullptr, this function can't delay this response
     if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
@@ -168,7 +165,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
         _pending_closures.emplace_back(*done, monotonicStopWatch);
         *done = nullptr;
     }
-    _recvr->_num_buffered_bytes += block_byte_size;
+    _recvr->_blocks_memory_usage->add(block_byte_size);
     _data_arrival_cv.notify_one();
 }
 
@@ -199,20 +196,18 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
     }
     materialize_block_inplace(*nblock);
 
-    size_t block_size = nblock->bytes();
-
+    size_t block_mem_size = nblock->allocated_bytes();
     std::unique_lock<std::mutex> l(_lock);
     if (_is_cancelled) {
         return;
     }
     COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_bytes_received);
-    _recvr->_blocks_memory_usage->add(nblock->allocated_bytes());
 
-    _block_queue.emplace_back(std::move(nblock));
+    _block_queue.emplace_back(std::move(nblock), block_mem_size);
     _update_block_queue_empty();
     _data_arrival_cv.notify_one();
 
-    if (_recvr->exceeds_limit(block_size)) {
+    if (_recvr->exceeds_limit(block_mem_size)) {
         // yiguolei
         // It is too tricky here, if the running thread is bthread then the 
tid may be wrong.
         std::thread::id tid = std::this_thread::get_id();
@@ -227,7 +222,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
         iter->second->wait(l);
     }
 
-    _recvr->_num_buffered_bytes += block_size;
+    _recvr->_blocks_memory_usage->add(block_mem_size);
 }
 
 void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
@@ -304,7 +299,6 @@ VDataStreamRecvr::VDataStreamRecvr(
           _row_desc(row_desc),
           _is_merging(is_merging),
           _is_closed(false),
-          _num_buffered_bytes(0),
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
           _enable_pipeline(state->enable_pipeline_exec()) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 523d364fdf..1fc635a7f7 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -89,7 +89,8 @@ public:
     void close();
 
     bool exceeds_limit(int batch_size) {
-        return _num_buffered_bytes + batch_size > 
config::exchg_node_buffer_size_bytes;
+        return _blocks_memory_usage->current_value() + batch_size >
+               config::exchg_node_buffer_size_bytes;
     }
 
     bool is_closed() const { return _is_closed; }
@@ -119,7 +120,6 @@ private:
     bool _is_merging;
     bool _is_closed;
 
-    std::atomic<int> _num_buffered_bytes;
     std::unique_ptr<MemTracker> _mem_tracker;
     // Managed by object pool
     std::vector<SenderQueue*> _sender_queues;
@@ -185,7 +185,7 @@ protected:
     std::atomic_int _num_remaining_senders;
     std::condition_variable _data_arrival_cv;
     std::condition_variable _data_removal_cv;
-    std::list<BlockUPtr> _block_queue;
+    std::list<std::pair<BlockUPtr, size_t>> _block_queue;
     std::atomic_bool _block_queue_empty = true;
 
     bool _received_first_batch;
@@ -238,19 +238,15 @@ public:
         }
         materialize_block_inplace(*nblock);
 
-        size_t block_size = nblock->bytes();
         auto block_mem_size = nblock->allocated_bytes();
         {
             std::unique_lock<std::mutex> l(_lock);
-            _block_queue.emplace_back(std::move(nblock));
+            _block_queue.emplace_back(std::move(nblock), block_mem_size);
         }
-        COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size);
+        COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
         _recvr->_blocks_memory_usage->add(block_mem_size);
         _update_block_queue_empty();
         _data_arrival_cv.notify_one();
-
-        _recvr->_num_buffered_bytes += block_size;
-        COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size);
     }
 };
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to