This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a34cc6ed23 [Refactor](exchange) Remove unless variable and change block mem count way (#16668) a34cc6ed23 is described below commit a34cc6ed2326836551beeebb94bd1d0873c4c10c Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon Feb 13 19:14:01 2023 +0800 [Refactor](exchange) Remove unless variable and change block mem count way (#16668) --- be/src/vec/runtime/vdata_stream_recvr.cpp | 20 +++++++------------- be/src/vec/runtime/vdata_stream_recvr.h | 14 +++++--------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index a1e0ea4c32..79cb7b5cfd 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -82,9 +82,7 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* eos) _received_first_batch = true; DCHECK(!_block_queue.empty()); - BlockUPtr next_block = std::move(_block_queue.front()); - auto block_byte_size = block->allocated_bytes(); - _recvr->_num_buffered_bytes -= block_byte_size; + auto [next_block, block_byte_size] = std::move(_block_queue.front()); _recvr->_blocks_memory_usage->add(-block_byte_size); _block_queue.pop_front(); _update_block_queue_empty(); @@ -156,9 +154,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time); COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); - _recvr->_blocks_memory_usage->add(block_byte_size); - _block_queue.emplace_back(std::move(block)); + _block_queue.emplace_back(std::move(block), block_byte_size); _update_block_queue_empty(); // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { @@ -168,7 +165,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe _pending_closures.emplace_back(*done, monotonicStopWatch); *done = nullptr; } - _recvr->_num_buffered_bytes += block_byte_size; + _recvr->_blocks_memory_usage->add(block_byte_size); _data_arrival_cv.notify_one(); } @@ -199,20 +196,18 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { } materialize_block_inplace(*nblock); - size_t block_size = nblock->bytes(); - + size_t block_mem_size = nblock->allocated_bytes(); std::unique_lock<std::mutex> l(_lock); if (_is_cancelled) { return; } COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received); - _recvr->_blocks_memory_usage->add(nblock->allocated_bytes()); - _block_queue.emplace_back(std::move(nblock)); + _block_queue.emplace_back(std::move(nblock), block_mem_size); _update_block_queue_empty(); _data_arrival_cv.notify_one(); - if (_recvr->exceeds_limit(block_size)) { + if (_recvr->exceeds_limit(block_mem_size)) { // yiguolei // It is too tricky here, if the running thread is bthread then the tid may be wrong. std::thread::id tid = std::this_thread::get_id(); @@ -227,7 +222,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { iter->second->wait(l); } - _recvr->_num_buffered_bytes += block_size; + _recvr->_blocks_memory_usage->add(block_mem_size); } void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { @@ -304,7 +299,6 @@ VDataStreamRecvr::VDataStreamRecvr( _row_desc(row_desc), _is_merging(is_merging), _is_closed(false), - _num_buffered_bytes(0), _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), _enable_pipeline(state->enable_pipeline_exec()) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 523d364fdf..1fc635a7f7 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -89,7 +89,8 @@ public: void close(); bool exceeds_limit(int batch_size) { - return _num_buffered_bytes + batch_size > config::exchg_node_buffer_size_bytes; + return _blocks_memory_usage->current_value() + batch_size > + config::exchg_node_buffer_size_bytes; } bool is_closed() const { return _is_closed; } @@ -119,7 +120,6 @@ private: bool _is_merging; bool _is_closed; - std::atomic<int> _num_buffered_bytes; std::unique_ptr<MemTracker> _mem_tracker; // Managed by object pool std::vector<SenderQueue*> _sender_queues; @@ -185,7 +185,7 @@ protected: std::atomic_int _num_remaining_senders; std::condition_variable _data_arrival_cv; std::condition_variable _data_removal_cv; - std::list<BlockUPtr> _block_queue; + std::list<std::pair<BlockUPtr, size_t>> _block_queue; std::atomic_bool _block_queue_empty = true; bool _received_first_batch; @@ -238,19 +238,15 @@ public: } materialize_block_inplace(*nblock); - size_t block_size = nblock->bytes(); auto block_mem_size = nblock->allocated_bytes(); { std::unique_lock<std::mutex> l(_lock); - _block_queue.emplace_back(std::move(nblock)); + _block_queue.emplace_back(std::move(nblock), block_mem_size); } - COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size); + COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size); _recvr->_blocks_memory_usage->add(block_mem_size); _update_block_queue_empty(); _data_arrival_cv.notify_one(); - - _recvr->_num_buffered_bytes += block_size; - COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size); } }; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org