This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0c264c8a14d [fix](pipelineX) fix scheduling bug in union operator
(#27131)
0c264c8a14d is described below
commit 0c264c8a14db5b422a8c34abfe4a1aac5544d59e
Author: Mryange <[email protected]>
AuthorDate: Fri Nov 17 10:02:54 2023 +0800
[fix](pipelineX) fix scheduling bug in union operator (#27131)
---
be/src/pipeline/pipeline_x/dependency.h | 17 ++++++-----------
1 file changed, 6 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 1d575690f8e..e9635253b2b 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -545,21 +545,16 @@ public:
_union_state = union_state;
}
void set_ready_for_write() override {}
- void set_ready_for_read() override {
- if (!_union_state->data_queue.is_all_finish()) {
- return;
- }
- if (_ready_for_read) {
- return;
- }
- _read_dependency_watcher.stop();
- _ready_for_read = true;
- }
+ void set_ready_for_read() override {}
[[nodiscard]] Dependency* read_blocked_by() override {
if (_union_state->child_count() == 0) {
return nullptr;
}
- return WriteDependency::read_blocked_by();
+ if (_union_state->data_queue.is_all_finish() ||
+ _union_state->data_queue.remaining_has_data()) {
+ return nullptr;
+ }
+ return this;
}
void block_reading() override {}
void block_writing() override {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]