github-actions[bot] commented on code in PR #30199:
URL: https://github.com/apache/doris/pull/30199#discussion_r1461627336


##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -57,9 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L 
* 1000L * 1000L;
 static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
 
 struct BasicSharedState {
-    DependencySPtr source_dep = nullptr;
-    DependencySPtr sink_dep = nullptr;
+    Dependency* source_dep = nullptr;
+    Dependency* sink_dep = nullptr;
 
+    std::atomic_bool source_released_flag {false};
+    std::atomic_bool sink_released_flag {false};
+    std::mutex source_release_lock;
+    std::mutex sink_release_lock;
+
+    void release_source_dep() {

Review Comment:
   warning: method 'release_source_dep' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void release_source_dep() {
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.h:
##########
@@ -79,6 +83,11 @@ class MultiCastDataStreamer {
         _block_reading(sender_idx);
     }
 
+    void released_dependency(int sender_idx) {

Review Comment:
   warning: method 'released_dependency' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void released_dependency(int sender_idx) {
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -108,9 +121,50 @@
     virtual void set_ready();
     void set_ready_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->set_ready();
     }
+    void set_block_to_read() {
+        DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        DCHECK(_shared_state->source_dep != nullptr) << debug_string();
+        _shared_state->source_dep->block();
+    }
+    void set_ready_to_write() {

Review Comment:
   warning: method 'set_ready_to_write' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_ready_to_write() {
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -108,9 +121,50 @@
     virtual void set_ready();
     void set_ready_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->set_ready();
     }
+    void set_block_to_read() {

Review Comment:
   warning: method 'set_block_to_read' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_block_to_read() {
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -57,9 +57,22 @@
 static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
 
 struct BasicSharedState {
-    DependencySPtr source_dep = nullptr;
-    DependencySPtr sink_dep = nullptr;
+    Dependency* source_dep = nullptr;
+    Dependency* sink_dep = nullptr;
 
+    std::atomic_bool source_released_flag {false};
+    std::atomic_bool sink_released_flag {false};
+    std::mutex source_release_lock;
+    std::mutex sink_release_lock;
+
+    void release_source_dep() {
+        std::unique_lock<std::mutex> lc(source_release_lock);
+        source_released_flag = true;
+    }
+    void release_sink_dep() {

Review Comment:
   warning: method 'release_sink_dep' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void release_sink_dep() {
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -108,9 +121,50 @@
     virtual void set_ready();
     void set_ready_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->set_ready();
     }
+    void set_block_to_read() {
+        DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        DCHECK(_shared_state->source_dep != nullptr) << debug_string();
+        _shared_state->source_dep->block();
+    }
+    void set_ready_to_write() {
+        if (_shared_state->sink_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
+        if (_shared_state->sink_released_flag) {
+            return;
+        }
+        DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
+        _shared_state->sink_dep->set_ready();
+    }
+    void set_block_to_write() {

Review Comment:
   warning: method 'set_block_to_write' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void set_block_to_write() {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to