This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new c964ed3a562 [fix](pipeline) Prevent concurrent accessing to
dependencies (#35560) (#35596)
c964ed3a562 is described below
commit c964ed3a5625036b2977b916196735f9084582d7
Author: Gabriel <[email protected]>
AuthorDate: Wed May 29 16:04:11 2024 +0800
[fix](pipeline) Prevent concurrent accessing to dependencies (#35560)
(#35596)
Issue Number: close #xxx
<!--Describe your changes.-->
If a pipeline task is cancelled by another thread during executing
`extract_dependencies`, dependencies will be accessed by different
read/write threads which will lead to serious result.
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 29 +++++++++++++++++++-------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 4 ++--
2 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 4e54ed99edb..c995b66997d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -109,9 +109,13 @@ Status PipelineXTask::prepare(const
TPipelineInstanceParams& local_params, const
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
}
{
+ std::vector<Dependency*> filter_dependencies;
const auto& deps =
_state->get_local_state(_source->operator_id())->filter_dependencies();
std::copy(deps.begin(), deps.end(),
- std::inserter(_filter_dependencies,
_filter_dependencies.end()));
+ std::inserter(filter_dependencies,
filter_dependencies.end()));
+
+ std::unique_lock<std::mutex> lc(_dependency_lock);
+ filter_dependencies.swap(_filter_dependencies);
}
// We should make sure initial state for task are runnable so that we can
do some preparation jobs (e.g. initialize runtime filters).
set_state(PipelineTaskState::RUNNABLE);
@@ -120,6 +124,9 @@ Status PipelineXTask::prepare(const
TPipelineInstanceParams& local_params, const
}
Status PipelineXTask::_extract_dependencies() {
+ std::vector<Dependency*> read_dependencies;
+ std::vector<Dependency*> write_dependencies;
+ std::vector<Dependency*> finish_dependencies;
for (auto op : _operators) {
auto result = _state->get_local_state_result(op->operator_id());
if (!result) {
@@ -128,10 +135,10 @@ Status PipelineXTask::_extract_dependencies() {
auto* local_state = result.value();
const auto& deps = local_state->dependencies();
std::copy(deps.begin(), deps.end(),
- std::inserter(_read_dependencies, _read_dependencies.end()));
+ std::inserter(read_dependencies, read_dependencies.end()));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
- _finish_dependencies.push_back(fin_dep);
+ finish_dependencies.push_back(fin_dep);
}
}
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::_extract_dependencies", {
@@ -141,14 +148,20 @@ Status PipelineXTask::_extract_dependencies() {
});
{
auto* local_state = _state->get_sink_local_state();
- _write_dependencies = local_state->dependencies();
- DCHECK(std::all_of(_write_dependencies.begin(),
_write_dependencies.end(),
+ write_dependencies = local_state->dependencies();
+ DCHECK(std::all_of(write_dependencies.begin(),
write_dependencies.end(),
[](auto* dep) { return dep->is_write_dependency();
}));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
- _finish_dependencies.push_back(fin_dep);
+ finish_dependencies.push_back(fin_dep);
}
}
+ {
+ std::unique_lock<std::mutex> lc(_dependency_lock);
+ read_dependencies.swap(_read_dependencies);
+ write_dependencies.swap(_write_dependencies);
+ finish_dependencies.swap(_finish_dependencies);
+ }
return Status::OK();
}
@@ -379,7 +392,7 @@ bool PipelineXTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_
}
void PipelineXTask::finalize() {
PipelineTask::finalize();
- std::unique_lock<std::mutex> lc(_release_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lock);
_finished = true;
_sink_shared_state.reset();
_op_shared_states.clear();
@@ -417,7 +430,7 @@ Status PipelineXTask::close_sink(Status exec_status) {
}
std::string PipelineXTask::debug_string() {
- std::unique_lock<std::mutex> lc(_release_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lock);
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "QueryId: {}\n",
print_id(query_context()->query_id()));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 03bafd47992..ef55423e16c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -141,7 +141,7 @@ public:
void clear_blocking_state() {
// Another thread may call finalize to release all dependencies
// And then it will core.
- std::unique_lock<std::mutex> lc(_release_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH &&
_blocked_dep) {
_blocked_dep->set_ready();
_blocked_dep = nullptr;
@@ -234,7 +234,7 @@ private:
Dependency* _execution_dep = nullptr;
std::atomic<bool> _finished {false};
- std::mutex _release_lock;
+ std::mutex _dependency_lock;
};
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]