This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 23e7423748 [pipeline](refactor) refactor pipeline task schedule logics 
(#22028)
23e7423748 is described below

commit 23e742374840338ba0df1b3802f66c1ee498a9d3
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Jul 25 17:18:26 2023 +0800

    [pipeline](refactor) refactor pipeline task schedule logics (#22028)
---
 be/src/exec/data_sink.h                  |  4 +++-
 be/src/pipeline/exec/operator.h          |  3 +--
 be/src/pipeline/pipeline_task.cpp        |  9 +++++++--
 be/src/pipeline/pipeline_task.h          |  2 ++
 be/src/pipeline/task_scheduler.cpp       | 30 +++++++++++++++++-----------
 be/src/vec/exec/scan/scanner_context.cpp |  1 +
 be/src/vec/exec/scan/vscan_node.cpp      |  4 +++-
 be/src/vec/sink/vdata_stream_sender.cpp  | 34 ++++++++++++++++++++++----------
 be/src/vec/sink/vdata_stream_sender.h    |  2 +-
 be/src/vec/sink/vtablet_sink.cpp         |  8 +++-----
 be/src/vec/sink/vtablet_sink.h           |  2 +-
 11 files changed, 64 insertions(+), 35 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index cf7b774fcd..fd59cd1d27 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -67,7 +67,9 @@ public:
         return Status::NotSupported("Not support send block");
     }
 
-    virtual void try_close(RuntimeState* state, Status exec_status) {}
+    [[nodiscard]] virtual Status try_close(RuntimeState* state, Status 
exec_status) {
+        return Status::OK();
+    }
 
     virtual bool is_close_done() { return true; }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 12a117b4c4..acf55cb7bc 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -285,8 +285,7 @@ public:
     }
 
     Status try_close(RuntimeState* state) override {
-        _sink->try_close(state, state->query_status());
-        return Status::OK();
+        return _sink->try_close(state, state->query_status());
     }
 
     [[nodiscard]] bool is_pending_finish() const override { return 
!_sink->is_close_done(); }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 411d9578ac..0e2041c488 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -284,8 +284,13 @@ Status PipelineTask::finalize() {
 }
 
 Status PipelineTask::try_close() {
-    _sink->try_close(_state);
-    return _source->try_close(_state);
+    if (_try_close_flag) {
+        return Status::OK();
+    }
+    _try_close_flag = true;
+    Status status1 = _sink->try_close(_state);
+    Status status2 = _source->try_close(_state);
+    return status1.ok() ? status2 : status1;
 }
 
 Status PipelineTask::close() {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 28ebf285e4..5cba2ef96e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -283,6 +283,8 @@ private:
     int _queue_level = 0;
     int _core_id = 0;
 
+    bool _try_close_flag = false;
+
     RuntimeProfile* _parent_profile;
     std::unique_ptr<RuntimeProfile> _task_profile;
     RuntimeProfile::Counter* _task_cpu_timer;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 0d725950d4..4f9e90f80e 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -327,28 +327,34 @@ void TaskScheduler::_do_work(size_t index) {
 }
 
 void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState 
state) {
-    // state only should be CANCELED or FINISHED
-    task->try_close();
     if (task->is_pending_finish()) {
         task->set_state(PipelineTaskState::PENDING_FINISH);
         _blocked_task_scheduler->add_blocked_task(task);
+        return;
+    }
+    auto status = task->try_close();
+    if (!status.ok() && state != PipelineTaskState::CANCELED) {
+        // Call `close` if `try_close` failed to make sure allocated resources 
are released
+        task->close();
+        
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
+                                         status.to_string());
+        state = PipelineTaskState::CANCELED;
+    } else if (task->is_pending_finish()) {
+        task->set_state(PipelineTaskState::PENDING_FINISH);
+        _blocked_task_scheduler->add_blocked_task(task);
+        return;
     } else {
-        auto status = task->close();
+        status = task->close();
         if (!status.ok() && state != PipelineTaskState::CANCELED) {
             
task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
                                              status.to_string());
             state = PipelineTaskState::CANCELED;
-        } else {
-            if (task->is_pending_finish()) {
-                task->set_state(PipelineTaskState::PENDING_FINISH);
-                _blocked_task_scheduler->add_blocked_task(task);
-                return;
-            }
         }
-        task->set_state(state);
-        task->set_close_pipeline_time();
-        task->fragment_context()->close_a_pipeline();
+        DCHECK(!task->is_pending_finish()) << task->debug_string();
     }
+    task->set_state(state);
+    task->set_close_pipeline_time();
+    task->fragment_context()->close_a_pipeline();
 }
 
 void TaskScheduler::shutdown() {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 2f535e4947..6570b5be65 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -299,6 +299,7 @@ void ScannerContext::clear_and_join(VScanNode* node, 
RuntimeState* state) {
         if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
             break;
         } else {
+            DCHECK(!state->enable_pipeline_exec());
             while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) {
                 _ctx_finish_cv.wait(l);
             }
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 4c0dd28b4e..66bfe6386d 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -323,11 +323,13 @@ Status VScanNode::close(RuntimeState* state) {
 
 void VScanNode::release_resource(RuntimeState* state) {
     if (_scanner_ctx.get()) {
-        if (!state->enable_pipeline_exec() || _should_create_scanner) {
+        if (!state->enable_pipeline_exec()) {
             // stop and wait the scanner scheduler to be done
             // _scanner_ctx may not be created for some short circuit case.
             _scanner_ctx->set_should_stop();
             _scanner_ctx->clear_and_join(this, state);
+        } else if (_should_create_scanner) {
+            _scanner_ctx->clear_and_join(this, state);
         }
     }
 
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 97154c3e3f..29fb3446dc 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -672,11 +672,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
     return Status::OK();
 }
 
-Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return Status::OK();
-    }
-
+Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
         Status st = _channels[i]->close(state);
@@ -684,13 +680,31 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
             final_st = st;
         }
     }
-    // wait all channels to finish
-    for (int i = 0; i < _channels.size(); ++i) {
-        Status st = _channels[i]->close_wait(state);
-        if (!st.ok() && final_st.ok()) {
-            final_st = st;
+    return final_st;
+}
+
+Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    Status final_st = Status::OK();
+    if (!state->enable_pipeline_exec()) {
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        // wait all channels to finish
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close_wait(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
         }
     }
+
     DataSink::close(state, exec_status);
     return final_st;
 }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 679557a92a..4dbe2625d9 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -92,7 +92,7 @@ public:
     Status open(RuntimeState* state) override;
 
     Status send(RuntimeState* state, Block* block, bool eos = false) override;
-
+    Status try_close(RuntimeState* state, Status exec_status) override;
     Status close(RuntimeState* state, Status exec_status) override;
     RuntimeProfile* profile() override { return _profile; }
 
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 79579cf2a0..e33f0f55a7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1320,11 +1320,7 @@ void VOlapTableSink::_cancel_all_channel(Status status) {
             print_id(_load_id), _txn_id, status);
 }
 
-void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
-    if (_try_close) {
-        return;
-    }
-
+Status VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
     if (status.ok()) {
@@ -1357,6 +1353,8 @@ void VOlapTableSink::try_close(RuntimeState* state, 
Status exec_status) {
         _close_status = status;
         _try_close = true;
     }
+
+    return Status::OK();
 }
 
 bool VOlapTableSink::is_close_done() {
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 38c9e7c325..1e5d6ea2d7 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -478,7 +478,7 @@ public:
 
     Status open(RuntimeState* state) override;
 
-    void try_close(RuntimeState* state, Status exec_status) override;
+    Status try_close(RuntimeState* state, Status exec_status) override;
     // if true, all node channels rpc done, can start close().
     bool is_close_done() override;
     Status close(RuntimeState* state, Status close_status) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to