github-actions[bot] commented on code in PR #26711:
URL: https://github.com/apache/doris/pull/26711#discussion_r1388015052


##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,39 @@
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+    if (!_has_dependencys) {
+        return;
+    }
+    auto* dep = _dependencys[sender_idx];
+    DCHECK(dep);
+    dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {

Review Comment:
   warning: method '_set_ready_for_read' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void MultiCastDataStreamer::_set_ready_for_read() const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:80:
   ```diff
   -     void _set_ready_for_read();
   +     void _set_ready_for_read() const;
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -899,7 +885,30 @@
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+    std::vector<Dependency*> source_dependencies;
     std::atomic<int> running_sink_operators = 0;
+    void add_running_sink_operators() { running_sink_operators++; }
+    void sub_running_sink_operators() {
+        running_sink_operators--;
+        if (running_sink_operators == 0) {
+            _set_ready_for_read();
+        }
+    }
+    void _set_ready_for_read() {
+        for (auto* dep : source_dependencies) {
+            DCHECK(dep);
+            dep->set_ready_for_read();
+        }
+    }
+    void set_dep_by_channel_id(Dependency* dep, int channel_id) {
+        source_dependencies[channel_id] = dep;
+        dep->block_reading();
+    }
+    void _set_ready_for_read(int channel_id) {

Review Comment:
   warning: method '_set_ready_for_read' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void _set_ready_for_read(int channel_id) {
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,39 @@
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
+    if (!_has_dependencys) {
+        return;
+    }
+    auto* dep = _dependencys[sender_idx];
+    DCHECK(dep);
+    dep->set_ready_for_read();
+}
+
+void MultiCastDataStreamer::_set_ready_for_read() {
+    if (!_has_dependencys) {
+        return;
+    }
+    for (auto* dep : _dependencys) {
+        DCHECK(dep);
+        dep->set_ready_for_read();
+    }
+}
+
+void MultiCastDataStreamer::_block_reading(int sender_idx) {

Review Comment:
   warning: method '_block_reading' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void MultiCastDataStreamer::_block_reading(int sender_idx) const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:81:
   ```diff
   -     void _block_reading(int sender_idx);
   +     void _block_reading(int sender_idx) const;
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -899,7 +885,30 @@ struct LocalExchangeSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+    std::vector<Dependency*> source_dependencies;
     std::atomic<int> running_sink_operators = 0;
+    void add_running_sink_operators() { running_sink_operators++; }
+    void sub_running_sink_operators() {
+        running_sink_operators--;
+        if (running_sink_operators == 0) {
+            _set_ready_for_read();
+        }
+    }
+    void _set_ready_for_read() {
+        for (auto* dep : source_dependencies) {
+            DCHECK(dep);
+            dep->set_ready_for_read();
+        }
+    }
+    void set_dep_by_channel_id(Dependency* dep, int channel_id) {

Review Comment:
   warning: method 'set_dep_by_channel_id' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_dep_by_channel_id(Dependency* dep, int channel_id) {
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -87,10 +92,39 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block
     for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
         if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
             _sender_pos_to_read[i] = end;
+            _set_ready_for_read(i);
         }
     }
     _eos = eos;
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {

Review Comment:
   warning: method '_set_ready_for_read' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) const {
   ```
   
   be/src/pipeline/exec/multi_cast_data_streamer.h:79:
   ```diff
   -     void _set_ready_for_read(int sender_idx);
   +     void _set_ready_for_read(int sender_idx) const;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to