This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7a98ac434f7 [Chore](case) pick some broadcast join case from #48247
#47380 (#48310)
7a98ac434f7 is described below
commit 7a98ac434f79a2060e2bac623da39701b7a9e3b4
Author: Pxl <[email protected]>
AuthorDate: Tue Feb 25 20:32:12 2025 +0800
[Chore](case) pick some broadcast join case from #48247 #47380 (#48310)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/pipeline.cpp | 9 +++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 25 ++++++
.../join/test_slow_close/test_slow_close.out | Bin 0 -> 133 bytes
.../join/test_slow_close/test_slow_close.groovy | 88 +++++++++++++++++++++
4 files changed, 122 insertions(+)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 450cb0c123d..7f7c086f38a 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -101,6 +101,15 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) {
}
void Pipeline::make_all_runnable() {
+ DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
+ auto pipeline_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
+ if (pipeline_id == id()) {
+ LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s";
+ sleep(10);
+ }
+ });
+
if (_sink_x->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index bea8d52200d..d158057173e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -263,6 +263,17 @@ Status PipelineXTask::execute(bool* eos) {
// The status must be runnable
if (!_opened) {
+ DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
+ auto required_pipeline_id =
+
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.open_sleep", "pipeline_id",
-1);
+ auto required_task_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.open_sleep", "task_id", -1);
+ if (required_pipeline_id == pipeline_id() && required_task_id ==
task_id()) {
+ LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s";
+ sleep(5);
+ }
+ });
if (_wake_up_early) {
*eos = true;
_eos = true;
@@ -352,6 +363,20 @@ Status PipelineXTask::execute(bool* eos) {
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
+
+ DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
+ auto required_pipeline_id =
+
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.sink_eos_sleep",
"pipeline_id", -1);
+ auto required_task_id =
+
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.sink_eos_sleep",
"task_id", -1);
+ if (required_pipeline_id == pipeline_id() && required_task_id
== task_id()) {
+ LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep
sleep 10s";
+ sleep(10);
+ }
+ });
+
status = _sink->sink(_state, block, *eos);
if (status.is<ErrorCode::END_OF_FILE>()) {
set_wake_up_and_dep_ready();
diff --git
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
new file mode 100644
index 00000000000..5e4d8ec9448
Binary files /dev/null and
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ
diff --git
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
new file mode 100644
index 00000000000..0b36d2da5ab
--- /dev/null
+++
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_slow_close") {
+ sql "set disable_join_reorder=true;"
+ sql "set runtime_filter_type='bloom_filter';"
+ sql "set parallel_pipeline_task_num=3"
+ sql "set ignore_runtime_filter_ids='1,2';"
+ sql "set enable_runtime_filter_prune=false;"
+
+ sql """ drop table if exists t1; """
+ sql """ drop table if exists t3; """
+ sql """ drop table if exists t5; """
+
+ sql """
+ create table t1 (
+ k1 int null,
+ k2 int null
+ )
+ duplicate key (k1)
+ distributed BY hash(k1) buckets 16
+ properties("replication_num" = "1");
+ """
+
+ sql """
+ create table t3 (
+ k1 int null,
+ k2 int null
+ )
+ duplicate key (k1)
+ distributed BY hash(k1) buckets 16
+ properties("replication_num" = "1");
+
+ """
+
+ sql """
+ create table t5 (
+ k1 int null,
+ k2 int null
+ )
+ duplicate key (k1)
+ distributed BY hash(k1) buckets 16
+ properties("replication_num" = "1");
+ """
+
+ sql """
+ insert into t1 select e1,e1 from (select 1 k1) as t lateral view
explode_numbers(100000) tmp1 as e1;
+ """
+
+ sql """
+ insert into t3 values(1,1),(2,2),(3,3);
+ """
+
+ sql """
+ insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
+ """
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id:
4])
+ qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast]
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
+ }
+
+ sql "set ignore_runtime_filter_ids='0';"
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id:
4, task_id: 7])
+
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id:
4, task_id: 15])
+ qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast]
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep")
+
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]