This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b546cee07e4 branch-4.0: [fix](coordinator) fix legacy coordinator use
1 instance for shuffle fragment when children has multiple instances #59295
(#59334)
b546cee07e4 is described below
commit b546cee07e4f433130ee0ff94fcc6c4ae8fb5fe6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 26 18:24:01 2025 +0800
branch-4.0: [fix](coordinator) fix legacy coordinator use 1 instance for
shuffle fragment when children has multiple instances #59295 (#59334)
Cherry-picked from #59295
Co-authored-by: 924060929 <[email protected]>
---
.../main/java/org/apache/doris/qe/Coordinator.java | 12 ++-
.../org/apache/doris/qe/OldCoordinatorTest.java | 102 +++++++++++++++++++++
2 files changed, 109 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c0b9f2be457..f6c929b6526 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1988,15 +1988,17 @@ public class Coordinator implements CoordInterface {
private int findMaxParallelFragmentIndex(PlanFragment fragment) {
Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment
has no children");
- // exclude broadcast join right side's child fragments
- List<PlanFragment> childFragmentCandidates =
fragment.getChildren().stream()
- .filter(e -> e.getOutputPartition() !=
DataPartition.UNPARTITIONED)
- .collect(Collectors.toList());
+ List<PlanFragment> childFragmentCandidates = fragment.getChildren();
int maxParallelism = 0;
int maxParaIndex = 0;
for (int i = 0; i < childFragmentCandidates.size(); i++) {
- PlanFragmentId childFragmentId =
childFragmentCandidates.get(i).getFragmentId();
+ PlanFragment planFragment = childFragmentCandidates.get(i);
+ // exclude broadcast join right side's child fragments
+ if (planFragment.getOutputPartition() ==
DataPartition.UNPARTITIONED) {
+ continue;
+ }
+ PlanFragmentId childFragmentId = planFragment.getFragmentId();
int currentChildFragmentParallelism =
fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
if (currentChildFragmentParallelism > maxParallelism) {
maxParallelism = currentChildFragmentParallelism;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
new file mode 100644
index 00000000000..b8cb5b74165
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OldCoordinatorTest.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class OldCoordinatorTest extends TestWithFeService {
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ createTable("create table test.tbl(id int, value int) distributed by
hash(id) buckets 1 properties('replication_num'='1')");
+ createTable("create table test.tbl2(id int, value int) distributed by
hash(id) buckets 1 properties('replication_num'='1')");
+ createTable("create table test.tbl3(id int, value int) distributed by
hash(id) buckets 10 properties('replication_num'='1')");
+ }
+
+ @Test
+ public void test() throws Exception {
+
connectContext.getSessionVariable().setDisableNereidsRules(RuleType.PRUNE_EMPTY_PARTITION.name());
+ connectContext.getSessionVariable().setDisableJoinReorder(true);
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 2;
+ connectContext.getSessionVariable().setEnableLocalShuffle(false);
+ StmtExecutor stmtExecutor = getSqlStmtExecutor(
+ "select *\n"
+ + "from (\n" // left most has exchange trigger
shuffle instance logic
+ + " select a.value\n"
+ + " from (select value from test.tbl group by
value)a\n"
+ + " join[broadcast] test.tbl2\n"
+ + " on a.value=tbl2.id\n"
+ + ")b\n"
+ + "join[shuffle] test.tbl3 c\n" // 2 instances,
should use as the shuffle instances
+ + "on b.value = c.id\n"
+ + "join[shuffle] (\n" // 1 instance, skip
+ + " select a.value\n"
+ + " from (select value from test.tbl group by
value)a\n"
+ + " join[broadcast] test.tbl2\n"
+ + " on a.value=tbl2.id\n"
+ + ")d\n"
+ + "on c.id = d.value\n"
+ + "join[shuffle] test.tbl e\n" // 1 instance, skip
+ + "on d.value=e.id");
+
+ AtomicBoolean shuffleFragmentHasMultiInstances = new
AtomicBoolean(false);
+ new Coordinator(connectContext, stmtExecutor.planner()) {
+ public void test() throws Exception {
+ super.processFragmentAssignmentAndParams();
+
+ Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap
= getFragmentExecParamsMap();
+ PlanFragmentId scanTbl3FragmentId = null;
+ for (FragmentExecParams fragmentExecParams :
fragmentExecParamsMap.values()) {
+ PlanNode planRoot =
fragmentExecParams.fragment.getPlanRoot();
+ if (planRoot instanceof OlapScanNode && ((OlapScanNode)
planRoot).getOlapTable().getName()
+ .equals("tbl3")) {
+ scanTbl3FragmentId =
fragmentExecParams.fragment.getId();
+ break;
+ }
+ }
+
+ for (FragmentExecParams fragmentExecParams :
fragmentExecParamsMap.values()) {
+ List<FInstanceExecParam> instances =
fragmentExecParams.instanceExecParams;
+ boolean childScanTbl3 = false;
+ for (PlanFragment child :
fragmentExecParams.fragment.getChildren()) {
+ if (child.getFragmentId().equals(scanTbl3FragmentId)) {
+ childScanTbl3 = true;
+ break;
+ }
+ }
+ if (childScanTbl3 && instances.size() >= 2) {
+ shuffleFragmentHasMultiInstances.set(true);
+ }
+ }
+ }
+ }.test();
+ Assertions.assertTrue(shuffleFragmentHasMultiInstances.get());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]