This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 36337c8bd93 [fix](multicast) should not ignore Status of block::merge
#35886 (#37869)
36337c8bd93 is described below
commit 36337c8bd935ce9dac3f152f60e96956bd55a18d
Author: TengJianPing <[email protected]>
AuthorDate: Tue Jul 16 19:03:24 2024 +0800
[fix](multicast) should not ignore Status of block::merge #35886 (#37869)
## Proposed changes
BP #35886
---
be/src/pipeline/exec/multi_cast_data_stream_source.cpp | 5 +++--
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 5 +++--
be/src/pipeline/exec/multi_cast_data_streamer.h | 2 +-
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 90c809c5359..91b6f083f79 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -96,7 +96,7 @@ Status
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
if (!_output_expr_contexts.empty()) {
output_block = &tmp_block;
}
- _multi_cast_data_streamer->pull(_consumer_id, output_block, &eos);
+ RETURN_IF_ERROR(_multi_cast_data_streamer->pull(_consumer_id,
output_block, &eos));
if (!_conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
@@ -185,7 +185,8 @@ Status
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
if (!local_state._output_expr_contexts.empty()) {
output_block = &tmp_block;
}
- local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id,
output_block, eos);
+
RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer.pull(_consumer_id,
+
output_block, eos));
if (!local_state._conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
output_block,
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 1b128ab6ca3..cef85f45bb1 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -29,7 +29,7 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int
used_count, size_t
block->clear();
}
-void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block*
block, bool* eos) {
+Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block*
block, bool* eos) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
if (pos_to_pull != _multi_cast_blocks.end()) {
@@ -42,7 +42,7 @@ void MultiCastDataStreamer::pull(int sender_idx,
doris::vectorized::Block* block
_multi_cast_blocks.pop_front();
} else {
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
- (void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block);
+
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
pos_to_pull->_used_count--;
pos_to_pull++;
}
@@ -51,6 +51,7 @@ void MultiCastDataStreamer::pull(int sender_idx,
doris::vectorized::Block* block
if (pos_to_pull == _multi_cast_blocks.end()) {
_block_reading(sender_idx);
}
+ return Status::OK();
}
void MultiCastDataStreamer::close_sender(int sender_idx) {
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 2078a729227..7631c7f7fdc 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -50,7 +50,7 @@ public:
~MultiCastDataStreamer() = default;
- void pull(int sender_idx, vectorized::Block* block, bool* eos);
+ Status pull(int sender_idx, vectorized::Block* block, bool* eos);
void close_sender(int sender_idx);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]