This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2ed122b787f [improvement](task exec context) add parent class
HasTaskExecutionCtx to own the task ctx (#29388)
2ed122b787f is described below
commit 2ed122b787f673797a4e9e20b4af4e4ece149fcd
Author: yiguolei <[email protected]>
AuthorDate: Tue Jan 2 15:28:27 2024 +0800
[improvement](task exec context) add parent class HasTaskExecutionCtx to
own the task ctx (#29388)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/task_scheduler.cpp | 3 +--
be/src/runtime/task_execution_context.h | 26 +++++++++++++++++++++++++-
be/src/vec/exec/scan/scanner_context.cpp | 10 ++++------
be/src/vec/exec/scan/scanner_context.h | 6 ++----
be/src/vec/exec/scan/scanner_scheduler.cpp | 4 ++--
5 files changed, 34 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index e814e4cdf2d..91980aeec7a 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -345,8 +345,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task,
PipelineTaskState state,
Status exec_status) {
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
- std::shared_ptr<TaskExecutionContext> lock_for_context =
- task->fragment_context()->shared_from_this();
+ auto lock_for_context = task->fragment_context()->shared_from_this();
auto status = task->try_close(exec_status);
auto cancel = [&]() {
task->query_context()->cancel(true, status.to_string(),
diff --git a/be/src/runtime/task_execution_context.h
b/be/src/runtime/task_execution_context.h
index 08564840f59..c876ed5cb0d 100644
--- a/be/src/runtime/task_execution_context.h
+++ b/be/src/runtime/task_execution_context.h
@@ -21,11 +21,35 @@
namespace doris {
-// This class act as a super class of all context like things
+// This class act as a super class of all context like things such as
+// plan fragment executor or pipelinefragmentcontext or
pipelinexfragmentcontext
class TaskExecutionContext : public
std::enable_shared_from_this<TaskExecutionContext> {
public:
TaskExecutionContext() = default;
virtual ~TaskExecutionContext() = default;
};
+using TaskExecutionContextSPtr = std::shared_ptr<TaskExecutionContext>;
+
+// Task Execution Context maybe plan fragment executor or
pipelinefragmentcontext or pipelinexfragmentcontext
+// In multi thread scenario, the object is created in main thread (such as
FragmentExecThread), but the object
+// maybe used in other thread(such as scanner thread, brpc->sender queue). If
the main thread stopped and destroy
+// the object, then the other thread may core. So the other thread must lock
the context to ensure the object exists.
+struct HasTaskExecutionCtx {
+ using Weak = typename TaskExecutionContextSPtr::weak_type;
+
+ HasTaskExecutionCtx(TaskExecutionContextSPtr task_exec_ctx) :
task_exec_ctx_(task_exec_ctx) {}
+
+ // Init task ctx from state, the state has to own a method named
get_task_execution_context()
+ // like runtime state
+ template <typename T>
+ HasTaskExecutionCtx(T* state) :
task_exec_ctx_(state->get_task_execution_context()) {}
+
+public:
+ inline TaskExecutionContextSPtr task_exec_ctx() const { return
task_exec_ctx_.lock(); }
+
+private:
+ Weak task_exec_ctx_;
+};
+
} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 16bb1ce8487..9c87967f505 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -52,7 +52,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
pipeline::ScanLocalStateBase* local_state,
std::shared_ptr<pipeline::ScanDependency>
dependency,
std::shared_ptr<pipeline::Dependency>
finish_dependency)
- : _state(state),
+ : HasTaskExecutionCtx(state),
+ _state(state),
_parent(nullptr),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
@@ -72,8 +73,6 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
_finish_dependency(finish_dependency) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
- // Use the task exec context as a lock between scanner threads and
fragment exection threads
- _task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
@@ -102,7 +101,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
const std::list<VScannerSPtr>& scanners,
int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
- : _state(state),
+ : HasTaskExecutionCtx(state),
+ _state(state),
_parent(parent),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
@@ -120,8 +120,6 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
- // Use the task exec context as a lock between scanner threads and
fragment exection threads
- _task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (_scanners.empty()) {
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 035d396bf65..8e840a47465 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -65,7 +65,8 @@ class SimplifiedScanScheduler;
// ScannerContext is also the scheduling unit of ScannerScheduler.
// ScannerScheduler schedules a ScannerContext at a time,
// and submits the Scanners to the scanner thread pool for data scanning.
-class ScannerContext : public std::enable_shared_from_this<ScannerContext> {
+class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
+ public HasTaskExecutionCtx {
ENABLE_FACTORY_CREATOR(ScannerContext);
public:
@@ -180,8 +181,6 @@ public:
bool _should_reset_thread_name = true;
- std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return
_task_exec_ctx; }
-
private:
template <typename Parent>
Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
@@ -199,7 +198,6 @@ protected:
void _set_scanner_done();
RuntimeState* _state = nullptr;
- std::weak_ptr<TaskExecutionContext> _task_exec_ctx;
VScanNode* _parent = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 6ec83e8bd6a..29b53b39353 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -172,7 +172,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}
void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx)
{
- auto task_lock = ctx->get_task_execution_context().lock();
+ auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " <<
print_id(_query_id)
// << " maybe finished";
@@ -266,7 +266,7 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
VScannerSPtr scanner) {
- auto task_lock = ctx->get_task_execution_context().lock();
+ auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " <<
print_id(_query_id)
// << " maybe finished";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]