github-actions[bot] commented on code in PR #30199: URL: https://github.com/apache/doris/pull/30199#discussion_r1461627336
########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -57,9 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L; static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); struct BasicSharedState { - DependencySPtr source_dep = nullptr; - DependencySPtr sink_dep = nullptr; + Dependency* source_dep = nullptr; + Dependency* sink_dep = nullptr; + std::atomic_bool source_released_flag {false}; + std::atomic_bool sink_released_flag {false}; + std::mutex source_release_lock; + std::mutex sink_release_lock; + + void release_source_dep() { Review Comment: warning: method 'release_source_dep' can be made static [readability-convert-member-functions-to-static] ```suggestion static void release_source_dep() { ``` ########## be/src/pipeline/exec/multi_cast_data_streamer.h: ########## @@ -79,6 +83,11 @@ class MultiCastDataStreamer { _block_reading(sender_idx); } + void released_dependency(int sender_idx) { Review Comment: warning: method 'released_dependency' can be made static [readability-convert-member-functions-to-static] ```suggestion static void released_dependency(int sender_idx) { ``` ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -108,9 +121,50 @@ virtual void set_ready(); void set_ready_to_read() { DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->set_ready(); } + void set_block_to_read() { + DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } + DCHECK(_shared_state->source_dep != nullptr) << debug_string(); + _shared_state->source_dep->block(); + } + void set_ready_to_write() { Review Comment: warning: method 'set_ready_to_write' can be made static [readability-convert-member-functions-to-static] ```suggestion static void set_ready_to_write() { ``` ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -108,9 +121,50 @@ virtual void set_ready(); void set_ready_to_read() { DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->set_ready(); } + void set_block_to_read() { Review Comment: warning: method 'set_block_to_read' can be made static [readability-convert-member-functions-to-static] ```suggestion static void set_block_to_read() { ``` ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -57,9 +57,22 @@ static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); struct BasicSharedState { - DependencySPtr source_dep = nullptr; - DependencySPtr sink_dep = nullptr; + Dependency* source_dep = nullptr; + Dependency* sink_dep = nullptr; + std::atomic_bool source_released_flag {false}; + std::atomic_bool sink_released_flag {false}; + std::mutex source_release_lock; + std::mutex sink_release_lock; + + void release_source_dep() { + std::unique_lock<std::mutex> lc(source_release_lock); + source_released_flag = true; + } + void release_sink_dep() { Review Comment: warning: method 'release_sink_dep' can be made static [readability-convert-member-functions-to-static] ```suggestion static void release_sink_dep() { ``` ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -108,9 +121,50 @@ virtual void set_ready(); void set_ready_to_read() { DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->set_ready(); } + void set_block_to_read() { + DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } + DCHECK(_shared_state->source_dep != nullptr) << debug_string(); + _shared_state->source_dep->block(); + } + void set_ready_to_write() { + if (_shared_state->sink_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock); + if (_shared_state->sink_released_flag) { + return; + } + DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); + _shared_state->sink_dep->set_ready(); + } + void set_block_to_write() { Review Comment: warning: method 'set_block_to_write' can be made static [readability-convert-member-functions-to-static] ```suggestion static void set_block_to_write() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org