github-actions[bot] commented on code in PR #26340: URL: https://github.com/apache/doris/pull/26340#discussion_r1380885733
########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -21,10 +21,33 @@ #include <mutex> #include "common/logging.h" +#include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/memory/mem_tracker.h" namespace doris::pipeline { +void Dependency::add_block_task(PipelineXTask* task) { + std::unique_lock<std::mutex> lc(_task_lock); + DCHECK(task->get_state() != PipelineTaskState::RUNNABLE); + DCHECK(avoid_using_blocked_queue(task->get_state())); + _block_task.push_back(task); +} + +void Dependency::try_to_wake_up_task() { Review Comment: warning: method 'try_to_wake_up_task' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_x/dependency.h:106: ```diff - void try_to_wake_up_task(); + static void try_to_wake_up_task(); ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -70,54 +70,77 @@ // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; - bool source_can_read() override { - if (_dry_run) { - return true; - } + Dependency* read_blocked_dependency() { for (auto* op_dep : _read_dependencies) { auto* dep = op_dep->read_blocked_by(); if (dep != nullptr) { dep->start_read_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); + return dep; } } - return true; + return nullptr; } - bool runtime_filters_are_ready_or_timeout() override { + bool source_can_read() override { + if (_dry_run) { + return true; + } + auto* dep = read_blocked_dependency(); + return dep == nullptr; + } + + RuntimeFilterDependency* filter_blocked_dependency() { auto* dep = _filter_dependency->filter_blocked_by(); if (dep != nullptr) { - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_RF); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; } - bool sink_can_write() override { + bool runtime_filters_are_ready_or_timeout() override { + auto* dep = filter_blocked_dependency(); + return dep == nullptr; + } + + WriteDependency* write_blocked_dependency() { auto* dep = _write_dependencies->write_blocked_by(); if (dep != nullptr) { dep->start_write_watcher(); - push_blocked_task_to_dependency(dep); - return false; + set_state(PipelineTaskState::BLOCKED_FOR_SINK); + dep->add_block_task(this); + return dep; } - return true; + return nullptr; + } + + bool sink_can_write() override { + auto* dep = write_blocked_dependency(); + return dep == nullptr; } Status finalize() override; std::string debug_string() override; - bool is_pending_finish() override { + FinishDependency* finish_blocked_dependency() { Review Comment: warning: method 'finish_blocked_dependency' can be made static [readability-convert-member-functions-to-static] ```suggestion static FinishDependency* finish_blocked_dependency() { ``` ########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -21,10 +21,33 @@ #include <mutex> #include "common/logging.h" +#include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/memory/mem_tracker.h" namespace doris::pipeline { +void Dependency::add_block_task(PipelineXTask* task) { Review Comment: warning: method 'add_block_task' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_x/dependency.h:104: ```diff - void add_block_task(PipelineXTask* task); + static void add_block_task(PipelineXTask* task); ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.h: ########## @@ -70,54 +70,77 @@ class PipelineXTask : public PipelineTask { // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; - bool source_can_read() override { - if (_dry_run) { - return true; - } + Dependency* read_blocked_dependency() { Review Comment: warning: method 'read_blocked_dependency' can be made static [readability-convert-member-functions-to-static] ```suggestion static Dependency* read_blocked_dependency() { ``` ########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -294,6 +317,18 @@ return results; } +void SetSharedState::set_probe_finished_children(int child_id) { Review Comment: warning: method 'set_probe_finished_children' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_x/dependency.h:786: ```diff - void set_probe_finished_children(int child_id); + static void set_probe_finished_children(int child_id); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org