This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 23e7423748 [pipeline](refactor) refactor pipeline task schedule logics (#22028) 23e7423748 is described below commit 23e742374840338ba0df1b3802f66c1ee498a9d3 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Jul 25 17:18:26 2023 +0800 [pipeline](refactor) refactor pipeline task schedule logics (#22028) --- be/src/exec/data_sink.h | 4 +++- be/src/pipeline/exec/operator.h | 3 +-- be/src/pipeline/pipeline_task.cpp | 9 +++++++-- be/src/pipeline/pipeline_task.h | 2 ++ be/src/pipeline/task_scheduler.cpp | 30 +++++++++++++++++----------- be/src/vec/exec/scan/scanner_context.cpp | 1 + be/src/vec/exec/scan/vscan_node.cpp | 4 +++- be/src/vec/sink/vdata_stream_sender.cpp | 34 ++++++++++++++++++++++---------- be/src/vec/sink/vdata_stream_sender.h | 2 +- be/src/vec/sink/vtablet_sink.cpp | 8 +++----- be/src/vec/sink/vtablet_sink.h | 2 +- 11 files changed, 64 insertions(+), 35 deletions(-) diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index cf7b774fcd..fd59cd1d27 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -67,7 +67,9 @@ public: return Status::NotSupported("Not support send block"); } - virtual void try_close(RuntimeState* state, Status exec_status) {} + [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { + return Status::OK(); + } virtual bool is_close_done() { return true; } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 12a117b4c4..acf55cb7bc 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -285,8 +285,7 @@ public: } Status try_close(RuntimeState* state) override { - _sink->try_close(state, state->query_status()); - return Status::OK(); + return _sink->try_close(state, state->query_status()); } [[nodiscard]] bool is_pending_finish() const override { return !_sink->is_close_done(); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 411d9578ac..0e2041c488 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -284,8 +284,13 @@ Status PipelineTask::finalize() { } Status PipelineTask::try_close() { - _sink->try_close(_state); - return _source->try_close(_state); + if (_try_close_flag) { + return Status::OK(); + } + _try_close_flag = true; + Status status1 = _sink->try_close(_state); + Status status2 = _source->try_close(_state); + return status1.ok() ? status2 : status1; } Status PipelineTask::close() { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 28ebf285e4..5cba2ef96e 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -283,6 +283,8 @@ private: int _queue_level = 0; int _core_id = 0; + bool _try_close_flag = false; + RuntimeProfile* _parent_profile; std::unique_ptr<RuntimeProfile> _task_profile; RuntimeProfile::Counter* _task_cpu_timer; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 0d725950d4..4f9e90f80e 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -327,28 +327,34 @@ void TaskScheduler::_do_work(size_t index) { } void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) { - // state only should be CANCELED or FINISHED - task->try_close(); if (task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); _blocked_task_scheduler->add_blocked_task(task); + return; + } + auto status = task->try_close(); + if (!status.ok() && state != PipelineTaskState::CANCELED) { + // Call `close` if `try_close` failed to make sure allocated resources are released + task->close(); + task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + status.to_string()); + state = PipelineTaskState::CANCELED; + } else if (task->is_pending_finish()) { + task->set_state(PipelineTaskState::PENDING_FINISH); + _blocked_task_scheduler->add_blocked_task(task); + return; } else { - auto status = task->close(); + status = task->close(); if (!status.ok() && state != PipelineTaskState::CANCELED) { task->fragment_context()->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); state = PipelineTaskState::CANCELED; - } else { - if (task->is_pending_finish()) { - task->set_state(PipelineTaskState::PENDING_FINISH); - _blocked_task_scheduler->add_blocked_task(task); - return; - } } - task->set_state(state); - task->set_close_pipeline_time(); - task->fragment_context()->close_a_pipeline(); + DCHECK(!task->is_pending_finish()) << task->debug_string(); } + task->set_state(state); + task->set_close_pipeline_time(); + task->fragment_context()->close_a_pipeline(); } void TaskScheduler::shutdown() { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 2f535e4947..6570b5be65 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -299,6 +299,7 @@ void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) { if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { break; } else { + DCHECK(!state->enable_pipeline_exec()); while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) { _ctx_finish_cv.wait(l); } diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 4c0dd28b4e..66bfe6386d 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -323,11 +323,13 @@ Status VScanNode::close(RuntimeState* state) { void VScanNode::release_resource(RuntimeState* state) { if (_scanner_ctx.get()) { - if (!state->enable_pipeline_exec() || _should_create_scanner) { + if (!state->enable_pipeline_exec()) { // stop and wait the scanner scheduler to be done // _scanner_ctx may not be created for some short circuit case. _scanner_ctx->set_should_stop(); _scanner_ctx->clear_and_join(this, state); + } else if (_should_create_scanner) { + _scanner_ctx->clear_and_join(this, state); } } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 97154c3e3f..29fb3446dc 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -672,11 +672,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { return Status::OK(); } -Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { - if (_closed) { - return Status::OK(); - } - +Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { Status st = _channels[i]->close(state); @@ -684,13 +680,31 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { final_st = st; } } - // wait all channels to finish - for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close_wait(state); - if (!st.ok() && final_st.ok()) { - final_st = st; + return final_st; +} + +Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + + Status final_st = Status::OK(); + if (!state->enable_pipeline_exec()) { + for (int i = 0; i < _channels.size(); ++i) { + Status st = _channels[i]->close(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + } + // wait all channels to finish + for (int i = 0; i < _channels.size(); ++i) { + Status st = _channels[i]->close_wait(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } } } + DataSink::close(state, exec_status); return final_st; } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 679557a92a..4dbe2625d9 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -92,7 +92,7 @@ public: Status open(RuntimeState* state) override; Status send(RuntimeState* state, Block* block, bool eos = false) override; - + Status try_close(RuntimeState* state, Status exec_status) override; Status close(RuntimeState* state, Status exec_status) override; RuntimeProfile* profile() override { return _profile; } diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 79579cf2a0..e33f0f55a7 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -1320,11 +1320,7 @@ void VOlapTableSink::_cancel_all_channel(Status status) { print_id(_load_id), _txn_id, status); } -void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { - if (_try_close) { - return; - } - +Status VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_close_timer); Status status = exec_status; if (status.ok()) { @@ -1357,6 +1353,8 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { _close_status = status; _try_close = true; } + + return Status::OK(); } bool VOlapTableSink::is_close_done() { diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 38c9e7c325..1e5d6ea2d7 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -478,7 +478,7 @@ public: Status open(RuntimeState* state) override; - void try_close(RuntimeState* state, Status exec_status) override; + Status try_close(RuntimeState* state, Status exec_status) override; // if true, all node channels rpc done, can start close(). bool is_close_done() override; Status close(RuntimeState* state, Status close_status) override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org