Gabriel39 commented on code in PR #26247: URL: https://github.com/apache/doris/pull/26247#discussion_r1379758707
########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -70,54 +70,76 @@ class PipelineXTask : public PipelineTask { // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; - bool source_can_read() override { - if (_dry_run) { - return true; - } + Dependency* read_blocked_dependency() { for (auto* op_dep : _read_dependencies) { auto* dep = op_dep->read_blocked_by(); if (dep != nullptr) { dep->start_read_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); + return dep; } } - return true; + return nullptr; } - bool runtime_filters_are_ready_or_timeout() override { + bool source_can_read() override { + if (_dry_run) { + return true; + } + auto* dep = read_blocked_dependency(); + return dep == nullptr; + } + + RuntimeFilterDependency* filter_blocked_dependency() { auto* dep = _filter_dependency->filter_blocked_by(); if (dep != nullptr) { - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_RF); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; } - bool sink_can_write() override { + bool runtime_filters_are_ready_or_timeout() override { + auto* dep = filter_blocked_dependency(); + return dep == nullptr; + } + + WriteDependency* write_blocked_dependency() { auto* dep = _write_dependencies->write_blocked_by(); if (dep != nullptr) { dep->start_write_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SINK); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; + } + + bool sink_can_write() override { + auto* dep = write_blocked_dependency(); + return dep == nullptr; Review Comment: ```suggestion return write_blocked_dependency() == nullptr; ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -70,54 +70,76 @@ class PipelineXTask : public PipelineTask { // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; - bool source_can_read() override { - if (_dry_run) { - return true; - } + Dependency* read_blocked_dependency() { for (auto* op_dep : _read_dependencies) { auto* dep = op_dep->read_blocked_by(); if (dep != nullptr) { dep->start_read_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); + return dep; } } - return true; + return nullptr; } - bool runtime_filters_are_ready_or_timeout() override { + bool source_can_read() override { + if (_dry_run) { + return true; + } + auto* dep = read_blocked_dependency(); + return dep == nullptr; Review Comment: ```suggestion return read_blocked_dependency() == nullptr; ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.cpp: ########## @@ -348,4 +357,39 @@ std::string PipelineXTask::debug_string() { return fmt::to_string(debug_string_buffer); } +bool PipelineXTask::try_wake_up(Dependency* wake_up_dep) { + // call by dependency + auto state = get_state(); + VecDateTimeValue now = VecDateTimeValue::local_time(); + DCHECK(avoid_using_blocked_queue(state)); + Dependency* block_dep = nullptr; + if (state == PipelineTaskState::PENDING_FINISH) { + block_dep = finish_blocked_dependency(); + } else if (query_context()->is_cancelled()) { + return _make_run(); + } else if (query_context()->is_timeout(now)) { + query_context()->cancel(true, "", Status::Cancelled("")); + return _make_run(); + } else { + if (state == PipelineTaskState::BLOCKED_FOR_SOURCE) { + block_dep = read_blocked_dependency(); + } else if (state == PipelineTaskState::BLOCKED_FOR_RF) { + block_dep = filter_blocked_dependency(); + } else if (state == PipelineTaskState::BLOCKED_FOR_SINK) { + block_dep = write_blocked_dependency(); + } + } + if (block_dep == nullptr) { + return _make_run(); + } + // block_dep != nullptr , block task has trans to other dep + DCHECK(wake_up_dep != block_dep); + return false; +} + +bool PipelineXTask::_make_run(PipelineTaskState t_state) { + set_state(t_state); + static_cast<void>(get_task_queue()->push_back(this)); + return true; Review Comment: Use `void` instead of `bool` as return type ########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -21,10 +21,33 @@ #include <mutex> #include "common/logging.h" +#include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/memory/mem_tracker.h" namespace doris::pipeline { +void Dependency::add_block_task(PipelineXTask* task) { + std::unique_lock<std::mutex> lc(_task_lock); + DCHECK(task->get_state() != PipelineTaskState::RUNNABLE); + DCHECK(avoid_using_blocked_queue(task->get_state())); + _block_task.push_back(task); +} + +void Dependency::try_to_wake_up_task() { + std::unique_lock<std::mutex> lc(_task_lock); + if (_block_task.empty()) { + return; + } + for (auto* task : _block_task) { + DCHECK(task->get_state() != PipelineTaskState::RUNNABLE); + if (task->try_wake_up(this)) { Review Comment: What will task do if wake up failed? ########## be/src/pipeline/pipeline_x/pipeline_x_task.cpp: ########## @@ -348,4 +357,39 @@ std::string PipelineXTask::debug_string() { return fmt::to_string(debug_string_buffer); } +bool PipelineXTask::try_wake_up(Dependency* wake_up_dep) { + // call by dependency + auto state = get_state(); + VecDateTimeValue now = VecDateTimeValue::local_time(); Review Comment: Use `DateV2<DateTimeV2Value>` ########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -70,54 +70,76 @@ class PipelineXTask : public PipelineTask { // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; - bool source_can_read() override { - if (_dry_run) { - return true; - } + Dependency* read_blocked_dependency() { for (auto* op_dep : _read_dependencies) { auto* dep = op_dep->read_blocked_by(); if (dep != nullptr) { dep->start_read_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); + return dep; } } - return true; + return nullptr; } - bool runtime_filters_are_ready_or_timeout() override { + bool source_can_read() override { + if (_dry_run) { + return true; + } + auto* dep = read_blocked_dependency(); + return dep == nullptr; + } + + RuntimeFilterDependency* filter_blocked_dependency() { auto* dep = _filter_dependency->filter_blocked_by(); if (dep != nullptr) { - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_RF); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; } - bool sink_can_write() override { + bool runtime_filters_are_ready_or_timeout() override { + auto* dep = filter_blocked_dependency(); + return dep == nullptr; + } + + WriteDependency* write_blocked_dependency() { auto* dep = _write_dependencies->write_blocked_by(); if (dep != nullptr) { dep->start_write_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SINK); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; + } + + bool sink_can_write() override { + auto* dep = write_blocked_dependency(); + return dep == nullptr; } Status finalize() override; std::string debug_string() override; - bool is_pending_finish() override { + FinishDependency* finish_blocked_dependency() { for (auto* fin_dep : _finish_dependencies) { auto* dep = fin_dep->finish_blocked_by(); if (dep != nullptr) { dep->start_finish_watcher(); - push_blocked_task_to_dependency(dep); - return true; + set_state(PipelineTaskState::PENDING_FINISH); + return dep; } } - return false; + return nullptr; + } + + bool is_pending_finish() override { + auto* dep = finish_blocked_dependency(); + return dep != nullptr; Review Comment: ```suggestion return finish_blocked_dependency() != nullptr; ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -70,54 +70,76 @@ class PipelineXTask : public PipelineTask { // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; - bool source_can_read() override { - if (_dry_run) { - return true; - } + Dependency* read_blocked_dependency() { for (auto* op_dep : _read_dependencies) { auto* dep = op_dep->read_blocked_by(); if (dep != nullptr) { dep->start_read_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); + return dep; } } - return true; + return nullptr; } - bool runtime_filters_are_ready_or_timeout() override { + bool source_can_read() override { + if (_dry_run) { + return true; + } + auto* dep = read_blocked_dependency(); + return dep == nullptr; + } + + RuntimeFilterDependency* filter_blocked_dependency() { auto* dep = _filter_dependency->filter_blocked_by(); if (dep != nullptr) { - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_RF); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; } - bool sink_can_write() override { + bool runtime_filters_are_ready_or_timeout() override { + auto* dep = filter_blocked_dependency(); + return dep == nullptr; Review Comment: ```suggestion return filter_blocked_dependency() == nullptr; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org