This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f53bb5d7ff7 branch-3.0: [fix](coordinator) Fix wrong bucket 
assignments by coordinator #45365 (#45401)
f53bb5d7ff7 is described below

commit f53bb5d7ff72f917db8e89d5cf51f2e7ed840352
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 13 17:10:24 2024 +0800

    branch-3.0: [fix](coordinator) Fix wrong bucket assignments by coordinator 
#45365 (#45401)
    
    Cherry-picked from #45365
    
    Co-authored-by: Gabriel <liwenqi...@selectdb.com>
---
 .../main/java/org/apache/doris/qe/Coordinator.java |  85 +++++++++++-----
 .../test_colocate_join_with_different_tablets.out  |  12 +++
 ...est_colocate_join_with_different_tablets.groovy | 109 +++++++++++++++++++++
 3 files changed, 182 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 0f6bc0212d1..2889e943e93 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2734,7 +2734,8 @@ public class Coordinator implements CoordInterface {
          * 1. `parallelExecInstanceNum * numBackends` is larger than scan 
ranges.
          * 2. Use Nereids planner.
          */
-        boolean ignoreStorageDataDistribution = params.fragment != null && 
params.fragment.useSerialSource(context);
+        boolean ignoreStorageDataDistribution = scanNodes != null && 
!scanNodes.isEmpty()
+                && params.fragment != null && 
params.fragment.useSerialSource(context);
 
         FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
         for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange
@@ -2744,33 +2745,69 @@ public class Coordinator implements CoordInterface {
                     = findOrInsert(assignment, addressScanRange.getKey(), new 
HashMap<>());
 
             if (ignoreStorageDataDistribution) {
-                FInstanceExecParam instanceParam = new FInstanceExecParam(
-                        null, addressScanRange.getKey(), 0, params);
-
-                for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
-                    for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange
-                            : nodeScanRangeMap.second.entrySet()) {
-                        if 
(!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
-                            range.put(nodeScanRange.getKey(), 
Lists.newArrayList());
-                            
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), 
Lists.newArrayList());
+                List<List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> perInstanceScanRanges
+                        = ListUtil.splitBySize(scanRange, 
parallelExecInstanceNum);
+                /**
+                 * Split scan ranges evenly into `parallelExecInstanceNum` 
instances.
+                 *
+                 *
+                 * For a fragment contains co-located join,
+                 *
+                 *      scan (id = 0) -> join build (id = 2)
+                 *                          |
+                 *      scan (id = 1) -> join probe (id = 2)
+                 *
+                 * If both of `scan (id = 0)` and `scan (id = 1)` are serial 
operators, we will plan local exchanger
+                 * after them:
+                 *
+                 *      scan (id = 0) -> local exchange -> join build (id = 2)
+                 *                                               |
+                 *      scan (id = 1) -> local exchange -> join probe (id = 2)
+                 *
+                 *
+                 * And there is another more complicated scenario, for 
example, `scan (id = 0)` has 10 partitions and
+                 * 3 buckets which means 3 * 10 tablets and `scan (id = 1)` 
has 3 buckets and no partition which means
+                 * 3 tablets totally. If expected parallelism is 8, we will 
get a serial scan (id = 0) and a
+                 * non-serial scan (id = 1). For this case, we will plan 
another plan with local exchange:
+                 *
+                 *      scan (id = 0) -> local exchange -> join build (id = 2)
+                 *                                               |
+                 *      scan (id = 1)          ->         join probe (id = 2)
+                 */
+                FInstanceExecParam firstInstanceParam = null;
+                for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> 
perInstanceScanRange
+                        : perInstanceScanRanges) {
+                    FInstanceExecParam instanceParam = new FInstanceExecParam(
+                            null, addressScanRange.getKey(), 0, params);
+
+                    if (firstInstanceParam == null) {
+                        firstInstanceParam = instanceParam;
+                    }
+                    for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : perInstanceScanRange) {
+                        instanceParam.addBucketSeq(nodeScanRangeMap.first);
+                        for (Map.Entry<Integer, List<TScanRangeParams>> 
nodeScanRange
+                                : nodeScanRangeMap.second.entrySet()) {
+                            int scanId = nodeScanRange.getKey();
+                            Optional<ScanNode> node = 
scanNodes.stream().filter(
+                                    scanNode -> scanNode.getId().asInt() == 
scanId).findFirst();
+                            Preconditions.checkArgument(node.isPresent());
+                            FInstanceExecParam instanceParamToScan = 
node.get().isSerialOperator()
+                                    ? firstInstanceParam : instanceParam;
+                            if 
(!instanceParamToScan.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
+                                range.put(nodeScanRange.getKey(), 
Lists.newArrayList());
+                                instanceParamToScan.perNodeScanRanges
+                                        .put(nodeScanRange.getKey(), 
Lists.newArrayList());
+                            }
+                            
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
+                            
instanceParamToScan.perNodeScanRanges.get(nodeScanRange.getKey())
+                                    .addAll(nodeScanRange.getValue());
                         }
-                        
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
-                        
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
-                                .addAll(nodeScanRange.getValue());
                     }
+                    params.instanceExecParams.add(instanceParam);
                 }
-                List<FInstanceExecParam> instanceExecParams = new 
ArrayList<>();
-                instanceExecParams.add(instanceParam);
-                for (int i = 1; i < parallelExecInstanceNum; i++) {
-                    instanceExecParams.add(new FInstanceExecParam(
-                            null, addressScanRange.getKey(), 0, params));
-                }
-                int index = 0;
-                for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> 
nodeScanRangeMap : scanRange) {
-                    instanceExecParams.get(index % 
instanceExecParams.size()).addBucketSeq(nodeScanRangeMap.first);
-                    index++;
+                for (int i = perInstanceScanRanges.size(); i < 
parallelExecInstanceNum; i++) {
+                    params.instanceExecParams.add(new FInstanceExecParam(null, 
addressScanRange.getKey(), 0, params));
                 }
-                params.instanceExecParams.addAll(instanceExecParams);
             } else {
                 int expectedInstanceNum = 1;
                 if (parallelExecInstanceNum > 1) {
diff --git 
a/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out
 
b/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out
new file mode 100644
index 00000000000..06c381356ea
--- /dev/null
+++ 
b/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+10000007       10000007        48414   10000007
+10000007       10000007        48414   10000007
+10000007       10000007        48414   10000007
+10000007       10000007        60426   10000007
+10000007       10000007        60426   10000007
+10000007       10000007        60426   10000007
+10000007       10000007        94460   10000007
+10000007       10000007        94460   10000007
+10000007       10000007        94460   10000007
+
diff --git 
a/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy
 
b/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy
new file mode 100644
index 00000000000..66183591686
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_colocate_join_with_different_tablets") {
+    sql """
+    DROP TABLE IF EXISTS `USR_V_KHZHSJ_ES_POC1`;
+    DROP TABLE IF EXISTS `USR_TLBL_VAL_R1`;
+        CREATE TABLE `USR_V_KHZHSJ_ES_POC1` (
+          `khid` bigint NULL,
+          `khh` bigint NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`khid`, `khh`)
+        DISTRIBUTED BY HASH(`khid`) BUCKETS 16
+        PROPERTIES (
+        "colocate_with" = "test_colocate_join_with_different_tabletsgroup1",
+        "replication_allocation" = "tag.location.default: 1"
+        );
+
+
+        CREATE TABLE `USR_TLBL_VAL_R1` (
+          `lbl_id` bigint NOT NULL COMMENT "标签ID",
+          `khh` bigint NULL COMMENT "客户号"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`lbl_id`, `khh`)
+        COMMENT '标签结果日表'
+        PARTITION BY LIST (`lbl_id`)
+        (PARTITION p0 VALUES IN ("0"),
+        PARTITION p1 VALUES IN ("1"),
+        PARTITION p29 VALUES IN ("29"),
+        PARTITION p35 VALUES IN ("35"),
+        PARTITION p57 VALUES IN ("57"),
+        PARTITION p352 VALUES IN ("352"),
+        PARTITION p402 VALUES IN ("402"),
+        PARTITION p523 VALUES IN ("523"),
+        PARTITION p2347 VALUES IN ("2347"),
+        PARTITION p10376 VALUES IN ("10376"),
+        PARTITION p42408 VALUES IN ("42408"),
+        PARTITION p44410 VALUES IN ("44410"),
+        PARTITION p48414 VALUES IN ("48414"),
+        PARTITION p50416 VALUES IN ("50416"),
+        PARTITION p52418 VALUES IN ("52418"),
+        PARTITION p56422 VALUES IN ("56422"),
+        PARTITION p60426 VALUES IN ("60426"),
+        PARTITION p64430 VALUES IN ("64430"),
+        PARTITION p66432 VALUES IN ("66432"),
+        PARTITION p70436 VALUES IN ("70436"),
+        PARTITION p72438 VALUES IN ("72438"),
+        PARTITION p74440 VALUES IN ("74440"),
+        PARTITION p78444 VALUES IN ("78444"),
+        PARTITION p84450 VALUES IN ("84450"),
+        PARTITION p86452 VALUES IN ("86452"),
+        PARTITION p88454 VALUES IN ("88454"),
+        PARTITION p90456 VALUES IN ("90456"),
+        PARTITION p92458 VALUES IN ("92458"),
+        PARTITION p94460 VALUES IN ("94460"),
+        PARTITION p96462 VALUES IN ("96462"),
+        PARTITION p98464 VALUES IN ("98464"),
+        PARTITION p100466 VALUES IN ("100466"),
+        PARTITION p102468 VALUES IN ("102468"),
+        PARTITION p104470 VALUES IN ("104470"),
+        PARTITION p106472 VALUES IN ("106472"),
+        PARTITION p108474 VALUES IN ("108474"),
+        PARTITION p110476 VALUES IN ("110476"),
+        PARTITION p112478 VALUES IN ("112478"),
+        PARTITION p114480 VALUES IN ("114480"),
+        PARTITION p122488 VALUES IN ("122488"),
+        PARTITION p124490 VALUES IN ("124490"),
+        PARTITION p126492 VALUES IN ("126492"),
+        PARTITION p130496 VALUES IN ("130496"),
+        PARTITION p134500 VALUES IN ("134500"),
+        PARTITION p150516 VALUES IN ("150516"),
+        PARTITION p154520 VALUES IN ("154520"),
+        PARTITION p158524 VALUES IN ("158524"),
+        PARTITION p158525 VALUES IN ("158525"),
+        PARTITION p1848141 VALUES IN ("1848141"),
+        PARTITION p1848161 VALUES IN ("1848161"),
+        PARTITION p1848177 VALUES IN ("1848177"),
+        PARTITION p1848197 VALUES IN ("1848197"),
+        PARTITION p1848218 VALUES IN ("1848218"))
+        DISTRIBUTED BY HASH(`khh`) BUCKETS 16
+        PROPERTIES (
+        "colocate_with" = "test_colocate_join_with_different_tabletsgroup1",
+        "replication_allocation" = "tag.location.default: 1"
+        );
+
+        insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
+        insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
+        insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007);
+
+        insert into USR_TLBL_VAL_R1 values(48414, 10000007);
+        insert into USR_TLBL_VAL_R1 values(94460, 10000007);
+        insert into USR_TLBL_VAL_R1 values(60426, 10000007);
+    """
+    qt_sql """ select  *    from  USR_V_KHZHSJ_ES_POC1  A,USR_TLBL_VAL_R1  B  
WHERE  A.khid  =  B.khh order by lbl_id; """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to