This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e03b887a97a [opt](MultiCast) Avoid copying while holding a lock
(#37462) (#39816)
e03b887a97a is described below
commit e03b887a97ac274dcc321921f773b6fc716b94d7
Author: Mryange <[email protected]>
AuthorDate: Fri Aug 23 17:15:34 2024 +0800
[opt](MultiCast) Avoid copying while holding a lock (#37462) (#39816)
Previously, copying was done while holding a lock; Now, get block while
holding the lock and then copy
https://github.com/apache/doris/pull/37462
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
.../exec/multi_cast_data_stream_source.cpp | 5 -
.../pipeline/exec/multi_cast_data_stream_source.h | 2 -
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 120 ++++++++++++---------
be/src/pipeline/exec/multi_cast_data_streamer.h | 11 +-
4 files changed, 75 insertions(+), 63 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 91b6f083f79..6ef8982a64f 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -114,11 +114,6 @@ Status
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
return Status::OK();
}
-Status MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) {
- _multi_cast_data_streamer->close_sender(_consumer_id);
- return OperatorBase::close(state);
-}
-
RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile()
const {
return _multi_cast_data_streamer->profile();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 8d14b4f266b..761e899c3d1 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -78,8 +78,6 @@ public:
bool can_read() override;
- Status close(doris::RuntimeState* state) override;
-
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override;
private:
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index cef85f45bb1..97fee2a32a9 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -23,81 +23,97 @@
namespace doris::pipeline {
-MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count,
size_t mem_size)
- : _used_count(used_count), _mem_size(mem_size) {
+MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int
un_finish_copy,
+ size_t mem_size)
+ : _used_count(used_count), _un_finish_copy(un_finish_copy),
_mem_size(mem_size) {
_block =
vectorized::Block::create_unique(block->get_columns_with_type_and_name());
block->clear();
}
Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block*
block, bool* eos) {
- std::lock_guard l(_mutex);
- auto& pos_to_pull = _sender_pos_to_read[sender_idx];
- if (pos_to_pull != _multi_cast_blocks.end()) {
- if (pos_to_pull->_used_count == 1) {
- DCHECK(pos_to_pull == _multi_cast_blocks.begin());
- pos_to_pull->_block->swap(*block);
-
- _cumulative_mem_size -= pos_to_pull->_mem_size;
- pos_to_pull++;
- _multi_cast_blocks.pop_front();
- } else {
- pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
-
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
- pos_to_pull->_used_count--;
- pos_to_pull++;
+ int* un_finish_copy = nullptr;
+ int use_count = 0;
+ {
+ std::lock_guard l(_mutex);
+ auto& pos_to_pull = _sender_pos_to_read[sender_idx];
+ const auto end = _multi_cast_blocks.end();
+ DCHECK(pos_to_pull != end);
+
+ *block = *pos_to_pull->_block;
+
+ _cumulative_mem_size -= pos_to_pull->_mem_size;
+
+ pos_to_pull->_used_count--;
+ use_count = pos_to_pull->_used_count;
+ un_finish_copy = &pos_to_pull->_un_finish_copy;
+
+ pos_to_pull++;
+
+ if (pos_to_pull == end) {
+ _block_reading(sender_idx);
}
+
+ *eos = _eos and pos_to_pull == end;
}
- *eos = _eos and pos_to_pull == _multi_cast_blocks.end();
- if (pos_to_pull == _multi_cast_blocks.end()) {
- _block_reading(sender_idx);
+
+ if (use_count == 0) {
+ // will clear _multi_cast_blocks
+ _wait_copy_block(block, *un_finish_copy);
+ } else {
+ _copy_block(block, *un_finish_copy);
}
+
return Status::OK();
}
-void MultiCastDataStreamer::close_sender(int sender_idx) {
- std::lock_guard l(_mutex);
- auto& pos_to_pull = _sender_pos_to_read[sender_idx];
- while (pos_to_pull != _multi_cast_blocks.end()) {
- if (pos_to_pull->_used_count == 1) {
- DCHECK(pos_to_pull == _multi_cast_blocks.begin());
- _cumulative_mem_size -= pos_to_pull->_mem_size;
- pos_to_pull++;
- _multi_cast_blocks.pop_front();
- } else {
- pos_to_pull->_used_count--;
- pos_to_pull++;
- }
+void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int&
un_finish_copy) {
+ const auto rows = block->rows();
+ for (int i = 0; i < block->columns(); ++i) {
+ block->get_by_position(i).column =
block->get_by_position(i).column->clone_resized(rows);
}
- _closed_sender_count++;
- _block_reading(sender_idx);
+
+ std::unique_lock l(_mutex);
+ un_finish_copy--;
+ if (un_finish_copy == 0) {
+ l.unlock();
+ _cv.notify_one();
+ }
+}
+
+void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int&
un_finish_copy) {
+ std::unique_lock l(_mutex);
+ _cv.wait(l, [&]() { return un_finish_copy == 0; });
+ _multi_cast_blocks.pop_front();
}
Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);
- auto block_mem_size = block->allocated_bytes();
- std::lock_guard l(_mutex);
- int need_process_count = _cast_sender_count - _closed_sender_count;
- if (need_process_count == 0) {
- return Status::EndOfFile("All data streamer is EOF");
- }
- // TODO: if the [queue back block rows + block->rows()] < batch_size,
better
- // do merge block. but need check the need_process_count and used_count
whether
- // equal
- _multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
+ const auto block_mem_size = block->allocated_bytes();
_cumulative_mem_size += block_mem_size;
COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size,
_peak_mem_usage->value()));
- auto end = _multi_cast_blocks.end();
- end--;
- for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
- if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
- _sender_pos_to_read[i] = end;
- _set_ready_for_read(i);
+ {
+ std::lock_guard l(_mutex);
+ _multi_cast_blocks.emplace_back(block, _cast_sender_count,
_cast_sender_count - 1,
+ block_mem_size);
+ // last elem
+ auto end = std::prev(_multi_cast_blocks.end());
+ for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+ if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+ _sender_pos_to_read[i] = end;
+ _set_ready_for_read(i);
+ }
+ }
+ _eos = eos;
+ }
+
+ if (_eos) {
+ for (auto* read_dep : _dependencies) {
+ read_dep->set_always_ready();
}
}
- _eos = eos;
return Status::OK();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 7631c7f7fdc..e718dc6d0e7 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -23,10 +23,11 @@ namespace doris::pipeline {
class Dependency;
struct MultiCastBlock {
- MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
+ MultiCastBlock(vectorized::Block* block, int used_count, int need_copy,
size_t mem_size);
std::unique_ptr<vectorized::Block> _block;
int _used_count;
+ int _un_finish_copy;
size_t _mem_size;
};
@@ -52,8 +53,6 @@ public:
Status pull(int sender_idx, vectorized::Block* block, bool* eos);
- void close_sender(int sender_idx);
-
Status push(RuntimeState* state, vectorized::Block* block, bool eos);
// use sink to check can_write, now always true after we support spill to
disk
@@ -84,14 +83,18 @@ private:
void _set_ready_for_read();
void _block_reading(int sender_idx);
+ void _copy_block(vectorized::Block* block, int& un_finish_copy);
+
+ void _wait_copy_block(vectorized::Block* block, int& un_finish_copy);
+
const RowDescriptor& _row_desc;
RuntimeProfile* _profile = nullptr;
std::list<MultiCastBlock> _multi_cast_blocks;
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
+ std::condition_variable _cv;
std::mutex _mutex;
bool _eos = false;
int _cast_sender_count = 0;
- int _closed_sender_count = 0;
int64_t _cumulative_mem_size = 0;
RuntimeProfile::Counter* _process_rows = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]