This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d1f2cd8e0 [Improvement](pipeline) Terminate early for short-circuit
join (#23378)
9d1f2cd8e0 is described below
commit 9d1f2cd8e0152f4af903d9a8f4e4fc505aeebd08
Author: Gabriel <[email protected]>
AuthorDate: Wed Aug 23 19:40:17 2023 +0800
[Improvement](pipeline) Terminate early for short-circuit join (#23378)
---
be/src/exec/exec_node.h | 2 ++
be/src/pipeline/exec/operator.h | 4 ++++
be/src/pipeline/pipeline_task.cpp | 8 +++----
be/src/pipeline/pipeline_task.h | 39 ++++++++++++++++++++++++++++++++--
be/src/vec/exec/join/vjoin_node_base.h | 2 ++
5 files changed, 49 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ad7eb83074..ae7b40dc20 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -134,6 +134,8 @@ public:
bool can_read() const { return _can_read; }
+ [[nodiscard]] virtual bool can_terminate_early() { return false; }
+
// Sink Data to ExecNode to do some stock work, both need impl with
method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 1d59025bf2..c6e41982a6 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -216,6 +216,8 @@ public:
virtual bool can_write() { return false; } // for sink
+ [[nodiscard]] virtual bool can_terminate_early() { return false; }
+
/**
* The main method to execute a pipeline task.
* Now it is a pull-based pipeline and operators pull data from its child
by this method.
@@ -341,6 +343,8 @@ public:
~StreamingOperator() override = default;
+ [[nodiscard]] bool can_terminate_early() override { return
_node->can_terminate_early(); }
+
Status prepare(RuntimeState* state) override {
_node->increase_ref();
_use_projection = _node->has_output_row_descriptor();
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index dbf155d83c..3c9f82f987 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -238,11 +238,11 @@ Status PipelineTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
- if (!_source->can_read()) {
+ if (!source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
- if (!_sink->can_write()) {
+ if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
@@ -250,11 +250,11 @@ Status PipelineTask::execute(bool* eos) {
this->set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
- if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
+ if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
- if (!_sink->can_write()) {
+ if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index cf4a86270d..dea3cd27ff 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -155,13 +155,48 @@ public:
return false;
}
- virtual bool source_can_read() { return _source->can_read(); }
+ virtual bool source_can_read() { return _source->can_read() ||
ignore_blocking_source(); }
virtual bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}
- virtual bool sink_can_write() { return _sink->can_write(); }
+ /**
+ * Consider the query plan below:
+ *
+ * ExchangeSource JoinBuild1
+ * \ /
+ * JoinProbe1 (Right Outer) JoinBuild2
+ * \ /
+ * JoinProbe2 (Right Outer)
+ * |
+ * Sink
+ *
+ * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
+ * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
+ *
+ * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
+ * produce more data.
+ *
+ * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
+ * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
+ *
+ * In a nutshell, we should follow the rules:
+ * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
+ * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
+ */
+ [[nodiscard]] virtual bool ignore_blocking_sink() { return
_root->can_terminate_early(); }
+
+ [[nodiscard]] virtual bool ignore_blocking_source() {
+ for (size_t i = 1; i < _operators.size(); i++) {
+ if (_operators[i]->can_terminate_early()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ virtual bool sink_can_write() { return _sink->can_write() ||
ignore_blocking_sink(); }
virtual Status finalize();
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index 7ebbd692e4..9864a27584 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -74,6 +74,8 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
+ [[nodiscard]] bool can_terminate_early() override { return
_short_circuit_for_probe; }
+
protected:
// Construct the intermediate blocks to store the results from join
operation.
void _construct_mutable_join_block();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]