github-actions[bot] commented on code in PR #26879:
URL: https://github.com/apache/doris/pull/26879#discussion_r1390724032
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -21,10 +21,44 @@
#include <mutex>
#include "common/logging.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/memory/mem_tracker.h"
namespace doris::pipeline {
+void Dependency::add_block_task(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ DCHECK(task->get_state() != PipelineTaskState::RUNNABLE);
+ DCHECK(avoid_using_blocked_queue(task->get_state()));
+ DCHECK(task->get_state() != PipelineTaskState::BLOCKED_FOR_SINK);
+ _blocked_task.push_back(task);
+}
+
+void Dependency::_try_to_wake_up_task() {
Review Comment:
warning: method '_try_to_wake_up_task' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:102:
```diff
- void _try_to_wake_up_task();
+ static void _try_to_wake_up_task();
```
##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -899,7 +865,30 @@
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+ std::vector<Dependency*> source_dependencies;
std::atomic<int> running_sink_operators = 0;
+ void add_running_sink_operators() { running_sink_operators++; }
+ void sub_running_sink_operators() {
+ auto val = running_sink_operators.fetch_sub(1);
+ if (val == 1) {
+ _set_ready_for_read();
+ }
+ }
+ void _set_ready_for_read() {
+ for (auto* dep : source_dependencies) {
+ DCHECK(dep);
+ dep->set_ready_for_read();
+ }
+ }
+ void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+ source_dependencies[channel_id] = dep;
+ dep->block_reading();
+ }
+ void set_ready_for_read(int channel_id) {
Review Comment:
warning: method 'set_ready_for_read' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void set_ready_for_read(int channel_id) {
```
##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,36 @@ Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
+ _set_ready_for_read(i);
}
}
_eos = eos;
return Status::OK();
}
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
Review Comment:
warning: method '_set_ready_for_read' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/multi_cast_data_streamer.h:82:
```diff
- void _set_ready_for_read(int sender_idx);
+ static void _set_ready_for_read(int sender_idx);
```
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -21,10 +21,44 @@
#include <mutex>
#include "common/logging.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/memory/mem_tracker.h"
namespace doris::pipeline {
+void Dependency::add_block_task(PipelineXTask* task) {
Review Comment:
warning: method 'add_block_task' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:99:
```diff
- void add_block_task(PipelineXTask* task);
+ static void add_block_task(PipelineXTask* task);
```
##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -899,7 +865,30 @@ struct LocalExchangeSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+ std::vector<Dependency*> source_dependencies;
std::atomic<int> running_sink_operators = 0;
+ void add_running_sink_operators() { running_sink_operators++; }
+ void sub_running_sink_operators() {
+ auto val = running_sink_operators.fetch_sub(1);
+ if (val == 1) {
+ _set_ready_for_read();
+ }
+ }
+ void _set_ready_for_read() {
+ for (auto* dep : source_dependencies) {
+ DCHECK(dep);
+ dep->set_ready_for_read();
+ }
+ }
+ void set_dep_by_channel_id(Dependency* dep, int channel_id) {
Review Comment:
warning: method 'set_dep_by_channel_id' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void set_dep_by_channel_id(Dependency* dep, int channel_id) {
```
##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,36 @@
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
+ _set_ready_for_read(i);
}
}
_eos = eos;
return Status::OK();
}
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+ if (_dependencies.empty()) {
+ return;
+ }
+ auto* dep = _dependencies[sender_idx];
+ DCHECK(dep);
+ dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {
+ for (auto* dep : _dependencies) {
+ DCHECK(dep);
+ dep->set_ready_for_read();
+ }
+}
+
+void MultiCastDataStreamer::_block_reading(int sender_idx) {
Review Comment:
warning: method '_block_reading' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/exec/multi_cast_data_streamer.h:84:
```diff
- void _block_reading(int sender_idx);
+ static void _block_reading(int sender_idx);
```
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -294,6 +350,12 @@
return results;
}
+void SetSharedState::set_probe_finished_children(int child_id) {
Review Comment:
warning: method 'set_probe_finished_children' can be made static
[readability-convert-member-functions-to-static]
be/src/pipeline/pipeline_x/dependency.h:777:
```diff
- void set_probe_finished_children(int child_id);
+ static void set_probe_finished_children(int child_id);
```
##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -70,56 +70,64 @@ class PipelineXTask : public PipelineTask {
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status) override;
+ Dependency* read_blocked_dependency() {
Review Comment:
warning: method 'read_blocked_dependency' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Dependency* read_blocked_dependency() {
```
##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -70,56 +70,64 @@
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status) override;
+ Dependency* read_blocked_dependency() {
+ for (auto* op_dep : _read_dependencies) {
+ _blocked_dep = op_dep->read_blocked_by();
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_read_watcher();
+ return _blocked_dep;
+ }
+ }
+ return nullptr;
+ }
+
bool source_can_read() override {
if (_dry_run) {
return true;
}
- for (auto* op_dep : _read_dependencies) {
- auto* dep = op_dep->read_blocked_by();
- if (dep != nullptr) {
- dep->start_read_watcher();
- push_blocked_task_to_dependency(dep);
- return false;
- }
+ return read_blocked_dependency() == nullptr;
+ }
+
+ Dependency* filter_blocked_dependency() {
+ _blocked_dep = _filter_dependency->filter_blocked_by();
+ if (_blocked_dep != nullptr) {
+ return _blocked_dep;
}
- return true;
+ return nullptr;
}
bool runtime_filters_are_ready_or_timeout() override {
- auto* dep = _filter_dependency->filter_blocked_by();
- if (dep != nullptr) {
- push_blocked_task_to_dependency(dep);
- return false;
- }
- return true;
+ return filter_blocked_dependency() == nullptr;
}
- bool sink_can_write() override {
- auto* dep = _write_dependencies->write_blocked_by();
- if (dep != nullptr) {
- dep->start_write_watcher();
- push_blocked_task_to_dependency(dep);
- return false;
+ Dependency* write_blocked_dependency() {
+ _blocked_dep = _write_dependencies->write_blocked_by();
+ if (_blocked_dep != nullptr) {
+ static_cast<WriteDependency*>(_blocked_dep)->start_write_watcher();
+ return _blocked_dep;
}
- return true;
+ return nullptr;
}
+ bool sink_can_write() override { return write_blocked_dependency() ==
nullptr; }
+
Status finalize() override;
std::string debug_string() override;
- bool is_pending_finish() override {
+ Dependency* finish_blocked_dependency() {
Review Comment:
warning: method 'finish_blocked_dependency' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Dependency* finish_blocked_dependency() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]