This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c964ed3a562 [fix](pipeline) Prevent concurrent accessing to 
dependencies (#35560) (#35596)
c964ed3a562 is described below

commit c964ed3a5625036b2977b916196735f9084582d7
Author: Gabriel <[email protected]>
AuthorDate: Wed May 29 16:04:11 2024 +0800

    [fix](pipeline) Prevent concurrent accessing to dependencies (#35560) 
(#35596)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    If a pipeline task is cancelled by another thread during executing
    `extract_dependencies`, dependencies will be accessed by different
    read/write threads which will lead to serious result.
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 29 +++++++++++++++++++-------
 be/src/pipeline/pipeline_x/pipeline_x_task.h   |  4 ++--
 2 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 4e54ed99edb..c995b66997d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -109,9 +109,13 @@ Status PipelineXTask::prepare(const 
TPipelineInstanceParams& local_params, const
                 
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
     }
     {
+        std::vector<Dependency*> filter_dependencies;
         const auto& deps = 
_state->get_local_state(_source->operator_id())->filter_dependencies();
         std::copy(deps.begin(), deps.end(),
-                  std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
+                  std::inserter(filter_dependencies, 
filter_dependencies.end()));
+
+        std::unique_lock<std::mutex> lc(_dependency_lock);
+        filter_dependencies.swap(_filter_dependencies);
     }
     // We should make sure initial state for task are runnable so that we can 
do some preparation jobs (e.g. initialize runtime filters).
     set_state(PipelineTaskState::RUNNABLE);
@@ -120,6 +124,9 @@ Status PipelineXTask::prepare(const 
TPipelineInstanceParams& local_params, const
 }
 
 Status PipelineXTask::_extract_dependencies() {
+    std::vector<Dependency*> read_dependencies;
+    std::vector<Dependency*> write_dependencies;
+    std::vector<Dependency*> finish_dependencies;
     for (auto op : _operators) {
         auto result = _state->get_local_state_result(op->operator_id());
         if (!result) {
@@ -128,10 +135,10 @@ Status PipelineXTask::_extract_dependencies() {
         auto* local_state = result.value();
         const auto& deps = local_state->dependencies();
         std::copy(deps.begin(), deps.end(),
-                  std::inserter(_read_dependencies, _read_dependencies.end()));
+                  std::inserter(read_dependencies, read_dependencies.end()));
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
-            _finish_dependencies.push_back(fin_dep);
+            finish_dependencies.push_back(fin_dep);
         }
     }
     DBUG_EXECUTE_IF("fault_inject::PipelineXTask::_extract_dependencies", {
@@ -141,14 +148,20 @@ Status PipelineXTask::_extract_dependencies() {
     });
     {
         auto* local_state = _state->get_sink_local_state();
-        _write_dependencies = local_state->dependencies();
-        DCHECK(std::all_of(_write_dependencies.begin(), 
_write_dependencies.end(),
+        write_dependencies = local_state->dependencies();
+        DCHECK(std::all_of(write_dependencies.begin(), 
write_dependencies.end(),
                            [](auto* dep) { return dep->is_write_dependency(); 
}));
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
-            _finish_dependencies.push_back(fin_dep);
+            finish_dependencies.push_back(fin_dep);
         }
     }
+    {
+        std::unique_lock<std::mutex> lc(_dependency_lock);
+        read_dependencies.swap(_read_dependencies);
+        write_dependencies.swap(_write_dependencies);
+        finish_dependencies.swap(_finish_dependencies);
+    }
     return Status::OK();
 }
 
@@ -379,7 +392,7 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_
 }
 void PipelineXTask::finalize() {
     PipelineTask::finalize();
-    std::unique_lock<std::mutex> lc(_release_lock);
+    std::unique_lock<std::mutex> lc(_dependency_lock);
     _finished = true;
     _sink_shared_state.reset();
     _op_shared_states.clear();
@@ -417,7 +430,7 @@ Status PipelineXTask::close_sink(Status exec_status) {
 }
 
 std::string PipelineXTask::debug_string() {
-    std::unique_lock<std::mutex> lc(_release_lock);
+    std::unique_lock<std::mutex> lc(_dependency_lock);
     fmt::memory_buffer debug_string_buffer;
 
     fmt::format_to(debug_string_buffer, "QueryId: {}\n", 
print_id(query_context()->query_id()));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 03bafd47992..ef55423e16c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -141,7 +141,7 @@ public:
     void clear_blocking_state() {
         // Another thread may call finalize to release all dependencies
         // And then it will core.
-        std::unique_lock<std::mutex> lc(_release_lock);
+        std::unique_lock<std::mutex> lc(_dependency_lock);
         if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH && 
_blocked_dep) {
             _blocked_dep->set_ready();
             _blocked_dep = nullptr;
@@ -234,7 +234,7 @@ private:
     Dependency* _execution_dep = nullptr;
 
     std::atomic<bool> _finished {false};
-    std::mutex _release_lock;
+    std::mutex _dependency_lock;
 };
 
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to