This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a6925cc0cfb Fix exchange operator can not aware end of file (#25562) a6925cc0cfb is described below commit a6925cc0cfb1ddc31ef5a20b0fb9059fbdb2ab15 Author: wangbo <wan...@apache.org> AuthorDate: Fri Oct 20 18:56:01 2023 +0800 Fix exchange operator can not aware end of file (#25562) --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++ be/src/pipeline/exec/operator.h | 7 +------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 31a8a1852a7..b3d2222e503 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -148,6 +148,9 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) { return Status::OK(); } TUniqueId ins_id = request.channel->_fragment_instance_id; + if (_is_receiver_eof(ins_id.lo)) { + return Status::EndOfFile("receiver eof"); + } bool send_now = false; { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 4ba2aec977f..125f8fd89e4 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -295,12 +295,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { - auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is<ErrorCode::END_OF_FILE>()) { - return Status::OK(); - } - return st; + return _sink->sink(state, in_block, source_state == SourceState::FINISHED); } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org