This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a6925cc0cfb Fix exchange operator can not aware end of file (#25562)
a6925cc0cfb is described below

commit a6925cc0cfb1ddc31ef5a20b0fb9059fbdb2ab15
Author: wangbo <wan...@apache.org>
AuthorDate: Fri Oct 20 18:56:01 2023 +0800

    Fix exchange operator can not aware end of file (#25562)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++
 be/src/pipeline/exec/operator.h               | 7 +------
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 31a8a1852a7..b3d2222e503 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -148,6 +148,9 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
         return Status::OK();
     }
     TUniqueId ins_id = request.channel->_fragment_instance_id;
+    if (_is_receiver_eof(ins_id.lo)) {
+        return Status::EndOfFile("receiver eof");
+    }
     bool send_now = false;
     {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4ba2aec977f..125f8fd89e4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -295,12 +295,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
         if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
-            auto st = _sink->sink(state, in_block, source_state == 
SourceState::FINISHED);
-            // TODO: improvement: if sink returned END_OF_FILE, pipeline task 
can be finished
-            if (st.template is<ErrorCode::END_OF_FILE>()) {
-                return Status::OK();
-            }
-            return st;
+            return _sink->sink(state, in_block, source_state == 
SourceState::FINISHED);
         }
         return Status::OK();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to