This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bd12a49baf [feature](Nereids) enable bucket shuffle join on fragment without scan node (#12891) bd12a49baf is described below commit bd12a49bafd68cc60b8615656184a9194539544c Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Fri Sep 23 15:01:50 2022 +0800 [feature](Nereids) enable bucket shuffle join on fragment without scan node (#12891) In the past, with legacy planner, we could only do bucket shuffle join on the join node belonging to the fragment with at least one scan node. But, bucket shuffle join should do on each join node that left child's data distribution satisfy join's demand. In nereids, we have data distribution info on each node. So we could enable bucket shuffle join on fragment without scan node. --- .../doris/nereids/glue/translator/PhysicalPlanTranslator.java | 11 +++++++++-- .../doris/nereids/properties/ChildOutputPropertyDeriver.java | 2 +- fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 6 +++--- .../nereids/properties/ChildOutputPropertyDeriverTest.java | 2 +- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 9d8da28230..dd7204a3c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -35,6 +35,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.Pair; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.DistributionSpecHash; +import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; @@ -969,10 +970,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } // assemble fragment hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); + if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) { + hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED); + } connectChildFragment(hashJoinNode, 1, leftFragment, rightFragment, context); leftFragment.setPlanRoot(hashJoinNode); - // TODO: use left fragment d - DataPartition rhsJoinPartition = new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, + TPartitionType partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; + if (leftDistributionSpec.getShuffleType() != ShuffleType.NATURAL) { + partitionType = TPartitionType.HASH_PARTITIONED; + } + DataPartition rhsJoinPartition = new DataPartition(partitionType, rightPartitionExprIds.stream().map(context::findSlotRef).collect(Collectors.toList())); rightFragment.setOutputPartition(rhsJoinPartition); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 715bf5e5d8..43cedd78a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -87,7 +87,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, return PhysicalProperties.GATHER; } // TODO: change ENFORCED back to bucketed, when coordinator could process bucket on agg correctly. - return PhysicalProperties.createHash(new DistributionSpecHash(columns, ShuffleType.ENFORCED)); + return PhysicalProperties.createHash(new DistributionSpecHash(columns, ShuffleType.BUCKETED)); case DISTINCT_GLOBAL: default: throw new RuntimeException("Could not derive output properties for agg phase: " + agg.getAggPhase()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7755d61ece..f0d6b3e36a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -420,7 +420,6 @@ public class Coordinator { } FragmentExecParams params = fragmentExecParamsMap.get(fragment.getDestFragment().getFragmentId()); params.inputFragments.add(fragment.getFragmentId()); - } coordAddress = new TNetworkAddress(localIP, Config.rpc_port); @@ -1017,7 +1016,6 @@ public class Coordinator { int bucketSeq = 0; int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId()); - TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); // when left table is empty, it's bucketset is empty. // set right table destination address to the address of left table @@ -1026,6 +1024,8 @@ public class Coordinator { bucketNum = 1; destParams.instanceExecParams.get(0).bucketSeqSet.add(0); } + // process bucket shuffle join on fragment without scan node + TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); while (bucketSeq < bucketNum) { TPlanFragmentDestination dest = new TPlanFragmentDestination(); @@ -1521,7 +1521,7 @@ public class Coordinator { bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, idToBackend, addressToBackendID); } - if (!(fragmentContainsColocateJoin | fragmentContainsBucketShuffleJoin)) { + if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java index 052537c755..6e53265dfb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java @@ -311,7 +311,7 @@ public class ChildOutputPropertyDeriverTest { Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty()); Assertions.assertTrue(result.getDistributionSpec() instanceof DistributionSpecHash); DistributionSpecHash actual = (DistributionSpecHash) result.getDistributionSpec(); - Assertions.assertEquals(ShuffleType.ENFORCED, actual.getShuffleType()); + Assertions.assertEquals(ShuffleType.BUCKETED, actual.getShuffleType()); Assertions.assertEquals(Lists.newArrayList(partition).stream() .map(SlotReference::getExprId).collect(Collectors.toList()), actual.getOrderedShuffledColumns()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org