This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f53bb5d7ff7 branch-3.0: [fix](coordinator) Fix wrong bucket assignments by coordinator #45365 (#45401) f53bb5d7ff7 is described below commit f53bb5d7ff72f917db8e89d5cf51f2e7ed840352 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Dec 13 17:10:24 2024 +0800 branch-3.0: [fix](coordinator) Fix wrong bucket assignments by coordinator #45365 (#45401) Cherry-picked from #45365 Co-authored-by: Gabriel <liwenqi...@selectdb.com> --- .../main/java/org/apache/doris/qe/Coordinator.java | 85 +++++++++++----- .../test_colocate_join_with_different_tablets.out | 12 +++ ...est_colocate_join_with_different_tablets.groovy | 109 +++++++++++++++++++++ 3 files changed, 182 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0f6bc0212d1..2889e943e93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2734,7 +2734,8 @@ public class Coordinator implements CoordInterface { * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. * 2. Use Nereids planner. */ - boolean ignoreStorageDataDistribution = params.fragment != null && params.fragment.useSerialSource(context); + boolean ignoreStorageDataDistribution = scanNodes != null && !scanNodes.isEmpty() + && params.fragment != null && params.fragment.useSerialSource(context); FragmentScanRangeAssignment assignment = params.scanRangeAssignment; for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange @@ -2744,33 +2745,69 @@ public class Coordinator implements CoordInterface { = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>()); if (ignoreStorageDataDistribution) { - FInstanceExecParam instanceParam = new FInstanceExecParam( - null, addressScanRange.getKey(), 0, params); - - for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) { - for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange - : nodeScanRangeMap.second.entrySet()) { - if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { - range.put(nodeScanRange.getKey(), Lists.newArrayList()); - instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); + List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges + = ListUtil.splitBySize(scanRange, parallelExecInstanceNum); + /** + * Split scan ranges evenly into `parallelExecInstanceNum` instances. + * + * + * For a fragment contains co-located join, + * + * scan (id = 0) -> join build (id = 2) + * | + * scan (id = 1) -> join probe (id = 2) + * + * If both of `scan (id = 0)` and `scan (id = 1)` are serial operators, we will plan local exchanger + * after them: + * + * scan (id = 0) -> local exchange -> join build (id = 2) + * | + * scan (id = 1) -> local exchange -> join probe (id = 2) + * + * + * And there is another more complicated scenario, for example, `scan (id = 0)` has 10 partitions and + * 3 buckets which means 3 * 10 tablets and `scan (id = 1)` has 3 buckets and no partition which means + * 3 tablets totally. If expected parallelism is 8, we will get a serial scan (id = 0) and a + * non-serial scan (id = 1). For this case, we will plan another plan with local exchange: + * + * scan (id = 0) -> local exchange -> join build (id = 2) + * | + * scan (id = 1) -> join probe (id = 2) + */ + FInstanceExecParam firstInstanceParam = null; + for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange + : perInstanceScanRanges) { + FInstanceExecParam instanceParam = new FInstanceExecParam( + null, addressScanRange.getKey(), 0, params); + + if (firstInstanceParam == null) { + firstInstanceParam = instanceParam; + } + for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) { + instanceParam.addBucketSeq(nodeScanRangeMap.first); + for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange + : nodeScanRangeMap.second.entrySet()) { + int scanId = nodeScanRange.getKey(); + Optional<ScanNode> node = scanNodes.stream().filter( + scanNode -> scanNode.getId().asInt() == scanId).findFirst(); + Preconditions.checkArgument(node.isPresent()); + FInstanceExecParam instanceParamToScan = node.get().isSerialOperator() + ? firstInstanceParam : instanceParam; + if (!instanceParamToScan.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + range.put(nodeScanRange.getKey(), Lists.newArrayList()); + instanceParamToScan.perNodeScanRanges + .put(nodeScanRange.getKey(), Lists.newArrayList()); + } + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + instanceParamToScan.perNodeScanRanges.get(nodeScanRange.getKey()) + .addAll(nodeScanRange.getValue()); } - range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()) - .addAll(nodeScanRange.getValue()); } + params.instanceExecParams.add(instanceParam); } - List<FInstanceExecParam> instanceExecParams = new ArrayList<>(); - instanceExecParams.add(instanceParam); - for (int i = 1; i < parallelExecInstanceNum; i++) { - instanceExecParams.add(new FInstanceExecParam( - null, addressScanRange.getKey(), 0, params)); - } - int index = 0; - for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) { - instanceExecParams.get(index % instanceExecParams.size()).addBucketSeq(nodeScanRangeMap.first); - index++; + for (int i = perInstanceScanRanges.size(); i < parallelExecInstanceNum; i++) { + params.instanceExecParams.add(new FInstanceExecParam(null, addressScanRange.getKey(), 0, params)); } - params.instanceExecParams.addAll(instanceExecParams); } else { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { diff --git a/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out b/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out new file mode 100644 index 00000000000..06c381356ea --- /dev/null +++ b/regression-test/data/correctness_p0/test_colocate_join_with_different_tablets.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +10000007 10000007 48414 10000007 +10000007 10000007 48414 10000007 +10000007 10000007 48414 10000007 +10000007 10000007 60426 10000007 +10000007 10000007 60426 10000007 +10000007 10000007 60426 10000007 +10000007 10000007 94460 10000007 +10000007 10000007 94460 10000007 +10000007 10000007 94460 10000007 + diff --git a/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy b/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy new file mode 100644 index 00000000000..66183591686 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_colocate_join_with_different_tablets.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_colocate_join_with_different_tablets") { + sql """ + DROP TABLE IF EXISTS `USR_V_KHZHSJ_ES_POC1`; + DROP TABLE IF EXISTS `USR_TLBL_VAL_R1`; + CREATE TABLE `USR_V_KHZHSJ_ES_POC1` ( + `khid` bigint NULL, + `khh` bigint NULL + ) ENGINE=OLAP + DUPLICATE KEY(`khid`, `khh`) + DISTRIBUTED BY HASH(`khid`) BUCKETS 16 + PROPERTIES ( + "colocate_with" = "test_colocate_join_with_different_tabletsgroup1", + "replication_allocation" = "tag.location.default: 1" + ); + + + CREATE TABLE `USR_TLBL_VAL_R1` ( + `lbl_id` bigint NOT NULL COMMENT "标签ID", + `khh` bigint NULL COMMENT "客户号" + ) ENGINE=OLAP + DUPLICATE KEY(`lbl_id`, `khh`) + COMMENT '标签结果日表' + PARTITION BY LIST (`lbl_id`) + (PARTITION p0 VALUES IN ("0"), + PARTITION p1 VALUES IN ("1"), + PARTITION p29 VALUES IN ("29"), + PARTITION p35 VALUES IN ("35"), + PARTITION p57 VALUES IN ("57"), + PARTITION p352 VALUES IN ("352"), + PARTITION p402 VALUES IN ("402"), + PARTITION p523 VALUES IN ("523"), + PARTITION p2347 VALUES IN ("2347"), + PARTITION p10376 VALUES IN ("10376"), + PARTITION p42408 VALUES IN ("42408"), + PARTITION p44410 VALUES IN ("44410"), + PARTITION p48414 VALUES IN ("48414"), + PARTITION p50416 VALUES IN ("50416"), + PARTITION p52418 VALUES IN ("52418"), + PARTITION p56422 VALUES IN ("56422"), + PARTITION p60426 VALUES IN ("60426"), + PARTITION p64430 VALUES IN ("64430"), + PARTITION p66432 VALUES IN ("66432"), + PARTITION p70436 VALUES IN ("70436"), + PARTITION p72438 VALUES IN ("72438"), + PARTITION p74440 VALUES IN ("74440"), + PARTITION p78444 VALUES IN ("78444"), + PARTITION p84450 VALUES IN ("84450"), + PARTITION p86452 VALUES IN ("86452"), + PARTITION p88454 VALUES IN ("88454"), + PARTITION p90456 VALUES IN ("90456"), + PARTITION p92458 VALUES IN ("92458"), + PARTITION p94460 VALUES IN ("94460"), + PARTITION p96462 VALUES IN ("96462"), + PARTITION p98464 VALUES IN ("98464"), + PARTITION p100466 VALUES IN ("100466"), + PARTITION p102468 VALUES IN ("102468"), + PARTITION p104470 VALUES IN ("104470"), + PARTITION p106472 VALUES IN ("106472"), + PARTITION p108474 VALUES IN ("108474"), + PARTITION p110476 VALUES IN ("110476"), + PARTITION p112478 VALUES IN ("112478"), + PARTITION p114480 VALUES IN ("114480"), + PARTITION p122488 VALUES IN ("122488"), + PARTITION p124490 VALUES IN ("124490"), + PARTITION p126492 VALUES IN ("126492"), + PARTITION p130496 VALUES IN ("130496"), + PARTITION p134500 VALUES IN ("134500"), + PARTITION p150516 VALUES IN ("150516"), + PARTITION p154520 VALUES IN ("154520"), + PARTITION p158524 VALUES IN ("158524"), + PARTITION p158525 VALUES IN ("158525"), + PARTITION p1848141 VALUES IN ("1848141"), + PARTITION p1848161 VALUES IN ("1848161"), + PARTITION p1848177 VALUES IN ("1848177"), + PARTITION p1848197 VALUES IN ("1848197"), + PARTITION p1848218 VALUES IN ("1848218")) + DISTRIBUTED BY HASH(`khh`) BUCKETS 16 + PROPERTIES ( + "colocate_with" = "test_colocate_join_with_different_tabletsgroup1", + "replication_allocation" = "tag.location.default: 1" + ); + + insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007); + insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007); + insert into USR_V_KHZHSJ_ES_POC1 values(10000007, 10000007); + + insert into USR_TLBL_VAL_R1 values(48414, 10000007); + insert into USR_TLBL_VAL_R1 values(94460, 10000007); + insert into USR_TLBL_VAL_R1 values(60426, 10000007); + """ + qt_sql """ select * from USR_V_KHZHSJ_ES_POC1 A,USR_TLBL_VAL_R1 B WHERE A.khid = B.khh order by lbl_id; """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org