This is an automated email from the ASF dual-hosted git repository.

xiejiann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6889225b19e [feat](Nereids) Optimize query by pushing down aggregation 
through join on foreign key (#36035)
6889225b19e is described below

commit 6889225b19e5826d74582c518f3d38982a1e3886
Author: 谢健 <jianx...@gmail.com>
AuthorDate: Mon Jul 1 14:37:23 2024 +0800

    [feat](Nereids) Optimize query by pushing down aggregation through join on 
foreign key (#36035)
    
    ## Proposed changes
    
    This PR optimizes query performance by pushing down aggregations through
    joins when grouped by a foreign key. This adjustment reduces data
    processing overhead above the join, improving both speed and resource
    efficiency.
    
    Transformation Example:
    
    Before Optimization:
    ```
    Aggregation(group by fk)
         |
       Join(pk = fk)
       /  \
      pk  fk
    ```
    After Optimization:
    ```
     Join(pk = fk)
     /     \
    pk  Aggregation(group by fk)
           |
          fk
    ```
---
 .../doris/nereids/jobs/executor/Rewriter.java      |   6 +-
 .../apache/doris/nereids/properties/FuncDeps.java  |  19 ++
 .../org/apache/doris/nereids/rules/RuleType.java   |   2 +-
 .../rewrite/PushDownAggThroughJoinOnPkFk.java      | 348 +++++++++++++++++++++
 .../rewrite/PushDownAggThroughJoinOnPkFkTest.java  | 158 ++++++++++
 .../shape/query38.out                              |  51 ++-
 .../shape/query87.out                              |  51 ++-
 .../noStatsRfPrune/query38.out                     |  51 ++-
 .../noStatsRfPrune/query87.out                     |  51 ++-
 .../no_stats_shape/query38.out                     |  51 ++-
 .../no_stats_shape/query87.out                     |  51 ++-
 .../rf_prune/query38.out                           |  51 ++-
 .../rf_prune/query87.out                           |  51 ++-
 .../nereids_tpcds_shape_sf100_p0/shape/query38.out |  51 ++-
 .../nereids_tpcds_shape_sf100_p0/shape/query87.out |  51 ++-
 15 files changed, 770 insertions(+), 273 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index 9505bdca87d..0a2906ca055 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -110,6 +110,7 @@ import 
org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoEsScan;
 import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoJdbcScan;
 import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoOdbcScan;
 import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoin;
+import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOnPkFk;
 import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOneSide;
 import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
 import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
@@ -348,8 +349,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
             ),
 
             // this rule should be invoked after topic "Join pull up"
-            topic("eliminate group by keys according to fd items",
-                    topDown(new EliminateGroupByKey())
+            topic("eliminate Aggregate according to fd items",
+                    topDown(new EliminateGroupByKey()),
+                    topDown(new PushDownAggThroughJoinOnPkFk())
             ),
 
             topic("Limit optimization",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java
index c17fd2eee57..be7b0853605 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java
@@ -62,6 +62,7 @@ public class FuncDeps {
     }
 
     private final Set<FuncDepsItem> items;
+    // determinants -> dependencies
     private final Map<Set<Slot>, Set<Set<Slot>>> edges;
 
     public FuncDeps() {
@@ -159,6 +160,24 @@ public class FuncDeps {
         return items.contains(new FuncDepsItem(dominate, dependency));
     }
 
+    public boolean isCircleDeps(Set<Slot> dominate, Set<Slot> dependency) {
+        return items.contains(new FuncDepsItem(dominate, dependency))
+                && items.contains(new FuncDepsItem(dependency, dominate));
+    }
+
+    /**
+     * find the determinants of dependencies
+     */
+    public Set<Set<Slot>> findDeterminats(Set<Slot> dependency) {
+        Set<Set<Slot>> determinants = new HashSet<>();
+        for (FuncDepsItem item : items) {
+            if (item.dependencies.equals(dependency)) {
+                determinants.add(item.determinants);
+            }
+        }
+        return determinants;
+    }
+
     @Override
     public String toString() {
         return items.toString();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index d6895b4121d..dcd36420c7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -186,7 +186,7 @@ public enum RuleType {
 
     PUSH_DOWN_AGG_THROUGH_JOIN_ONE_SIDE(RuleTypeClass.REWRITE),
     PUSH_DOWN_AGG_THROUGH_JOIN(RuleTypeClass.REWRITE),
-
+    PUSH_DOWN_AGG_THROUGH_JOIN_ON_PKFK(RuleTypeClass.REWRITE),
     TRANSPOSE_LOGICAL_SEMI_JOIN_LOGICAL_JOIN(RuleTypeClass.REWRITE),
     TRANSPOSE_LOGICAL_SEMI_JOIN_LOGICAL_JOIN_PROJECT(RuleTypeClass.REWRITE),
     LOGICAL_SEMI_JOIN_COMMUTE(RuleTypeClass.REWRITE),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java
new file mode 100644
index 00000000000..0fb3ed11562
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFk.java
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.FuncDeps;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Project;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.util.JoinUtils;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.thrift.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Push down agg through join with foreign key:
+ *    Agg(group by fk/pk)
+ *     |
+ *   Join(pk = fk)
+ *   /  \
+ *  pk  fk
+ *  ======>
+ *   Join(pk = fk)
+ *   /     \
+ *  |  Agg(group by fk)
+ *  |      |
+ *  pk    fk
+ */
+public class PushDownAggThroughJoinOnPkFk implements RewriteRuleFactory {
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                logicalAggregate(
+                        innerLogicalJoin()
+                                .when(j -> !j.isMarkJoin()
+                                        && 
j.getOtherJoinConjuncts().isEmpty()))
+                        .when(agg -> 
agg.getGroupByExpressions().stream().allMatch(Slot.class::isInstance))
+                        .thenApply(ctx -> pushAgg(ctx.root, ctx.root.child()))
+                        .toRule(RuleType.PUSH_DOWN_AGG_THROUGH_JOIN_ON_PKFK),
+                logicalAggregate(
+                        logicalProject(
+                                innerLogicalJoin()
+                                        .when(j -> 
j.getJoinType().isInnerJoin()
+                                                && !j.isMarkJoin()
+                                                && 
j.getOtherJoinConjuncts().isEmpty()))
+                                .when(Project::isAllSlots))
+                        .when(agg -> 
agg.getGroupByExpressions().stream().allMatch(Slot.class::isInstance))
+                        .thenApply(ctx -> pushAgg(ctx.root, 
ctx.root.child().child()))
+                        .toRule(RuleType.PUSH_DOWN_AGG_THROUGH_JOIN_ON_PKFK)
+        );
+    }
+
+    private @Nullable Plan pushAgg(LogicalAggregate<?> agg, LogicalJoin<?, ?> 
join) {
+        InnerJoinCluster innerJoinCluster = new InnerJoinCluster();
+        innerJoinCluster.collectContiguousInnerJoins(join);
+        if (!innerJoinCluster.isValid()) {
+            return null;
+        }
+        for (Entry<BitSet, LogicalJoin<?, ?>> e : 
innerJoinCluster.getJoinsMap().entrySet()) {
+            LogicalJoin<?, ?> subJoin = e.getValue();
+            Pair<Plan, Plan> primaryAndForeign = 
tryExtractPrimaryForeign(subJoin);
+            if (primaryAndForeign == null) {
+                continue;
+            }
+            LogicalAggregate<?> newAgg = eliminatePrimaryOutput(agg, 
primaryAndForeign.first, primaryAndForeign.second);
+            if (newAgg == null) {
+                return null;
+            }
+            LogicalJoin<?, ?> newJoin = innerJoinCluster
+                    .constructJoinWithPrimary(e.getKey(), subJoin, 
primaryAndForeign.first);
+            if (newJoin != null && newJoin.left() == primaryAndForeign.first) {
+                return newJoin.withChildren(newJoin.left(), 
newAgg.withChildren(newJoin.right()));
+            } else if (newJoin != null && newJoin.right() == 
primaryAndForeign.first) {
+                return 
newJoin.withChildren(newAgg.withChildren(newJoin.left()), newJoin.right());
+            }
+        }
+        return null;
+    }
+
+    // eliminate the slot of primary plan in agg
+    private LogicalAggregate<?> eliminatePrimaryOutput(LogicalAggregate<?> agg,
+            Plan primary, Plan foreign) {
+        Set<Slot> aggInputs = agg.getInputSlots();
+        if (primary.getOutputSet().stream().noneMatch(aggInputs::contains)) {
+            return agg;
+        }
+        Set<Slot> primaryOutputSet = primary.getOutputSet();
+        Set<Slot> primarySlots = Sets.intersection(aggInputs, 
primaryOutputSet);
+        DataTrait dataTrait = agg.child().getLogicalProperties().getTrait();
+        FuncDeps funcDeps = 
dataTrait.getAllValidFuncDeps(Sets.union(foreign.getOutputSet(), 
primary.getOutputSet()));
+        HashMap<Slot, Slot> primaryToForeignDeps = new HashMap<>();
+        for (Slot slot : primarySlots) {
+            Set<Set<Slot>> replacedSlotSets = 
funcDeps.findDeterminats(ImmutableSet.of(slot));
+            for (Set<Slot> replacedSlots : replacedSlotSets) {
+                if 
(primaryOutputSet.stream().noneMatch(replacedSlots::contains)
+                        && replacedSlots.size() == 1) {
+                    primaryToForeignDeps.put(slot, 
replacedSlots.iterator().next());
+                    break;
+                }
+            }
+        }
+
+        Set<Expression> newGroupBySlots = constructNewGroupBy(agg, 
primaryOutputSet, primaryToForeignDeps);
+        List<NamedExpression> newOutput = constructNewOutput(
+                agg, primaryOutputSet, primaryToForeignDeps, funcDeps, 
primary);
+        if (newGroupBySlots == null || newOutput == null) {
+            return null;
+        }
+        return agg.withGroupByAndOutput(ImmutableList.copyOf(newGroupBySlots), 
ImmutableList.copyOf(newOutput));
+    }
+
+    private @Nullable Set<Expression> constructNewGroupBy(LogicalAggregate<?> 
agg, Set<Slot> primaryOutputs,
+            Map<Slot, Slot> primaryToForeignBiDeps) {
+        Set<Expression> newGroupBySlots = new HashSet<>();
+        for (Expression expression : agg.getGroupByExpressions()) {
+            if (!(expression instanceof Slot)) {
+                return null;
+            }
+            if (primaryOutputs.contains((Slot) expression)
+                    && !primaryToForeignBiDeps.containsKey((Slot) expression)) 
{
+                return null;
+            }
+            expression = primaryToForeignBiDeps.getOrDefault(expression, 
(Slot) expression);
+            newGroupBySlots.add(expression);
+        }
+        return newGroupBySlots;
+    }
+
+    private @Nullable List<NamedExpression> 
constructNewOutput(LogicalAggregate<?> agg, Set<Slot> primaryOutput,
+            Map<Slot, Slot> primaryToForeignDeps, FuncDeps funcDeps, Plan 
primaryPlan) {
+        List<NamedExpression> newOutput = new ArrayList<>();
+        for (NamedExpression expression : agg.getOutputExpressions()) {
+            // There are three cases for output expressions:
+            // 1. Slot: the slot is from primary plan, we need to replace it 
with
+            //             the corresponding slot from foreign plan,
+            //             or skip it when it isn't in group by.
+            // 2. Count: the count is from primary plan,
+            //             we need to replace the slot in the count with the 
corresponding slot
+            //             from foreign plan
+            if (expression instanceof Slot && 
primaryPlan.getOutput().contains(expression)) {
+                if (primaryToForeignDeps.containsKey(expression)) {
+                    expression = primaryToForeignDeps.getOrDefault(expression, 
expression.toSlot());
+                } else {
+                    continue;
+                }
+            }
+            if (expression instanceof Alias
+                    && expression.child(0) instanceof Count
+                    && expression.child(0).child(0) instanceof Slot) {
+                // count(slot) can be rewritten by circle deps
+                Slot slot = (Slot) expression.child(0).child(0);
+                if (primaryToForeignDeps.containsKey(slot)
+                        && funcDeps.isCircleDeps(
+                                ImmutableSet.of(slot), 
ImmutableSet.of(primaryToForeignDeps.get(slot)))) {
+                    expression = (NamedExpression) expression.rewriteUp(e ->
+                            e instanceof Slot
+                                    ? primaryToForeignDeps.getOrDefault((Slot) 
e, (Slot) e)
+                                    : e);
+                }
+            }
+            if (!(expression instanceof Slot)
+                    && 
expression.getInputSlots().stream().anyMatch(primaryOutput::contains)) {
+                return null;
+            }
+            newOutput.add(expression);
+        }
+        return newOutput;
+    }
+
+    // try to extract primary key table and foreign key table
+    private @Nullable Pair<Plan, Plan> tryExtractPrimaryForeign(LogicalJoin<?, 
?> join) {
+        Plan primary;
+        Plan foreign;
+        if (JoinUtils.canEliminateByFk(join, join.left(), join.right())) {
+            primary = join.left();
+            foreign = join.right();
+        } else if (JoinUtils.canEliminateByFk(join, join.right(), 
join.left())) {
+            primary = join.right();
+            foreign = join.left();
+        } else {
+            return null;
+        }
+        return Pair.of(primary, foreign);
+    }
+
+    /**
+     * This class flattens nested join clusters and optimizes aggregation 
pushdown.
+     *
+     * Example of flattening:
+     *     Join1                   Join1         Join2
+     *    /    \                   /  \         /    \
+     *   a    Join2      =====>   a    b       b      c
+     *       /     \
+     *      b       c
+     *
+     * After flattening, we attempt to push down aggregations for each join.
+     * For instance, if b is a primary key table and c is a foreign key table:
+     *
+     * Original (can't push down):     After flattening (can push down):
+     *    agg(Join1)                       Join1         Join2
+     *    /    \                           /  \         /    \
+     *   a    Join2            =====>     a    b       b   agg(c)
+     *       /     \
+     *      b       c
+     *
+     * Finally, we can reorganize the join tree:
+     *     Join2
+     *    /     \
+     * agg(c)   Join1
+     *         /     \
+     *        a       b
+     */
+    static class InnerJoinCluster {
+        private final Map<BitSet, LogicalJoin<?, ?>> innerJoins = new 
HashMap<>();
+        private final List<Plan> leaf = new ArrayList<>();
+
+        void collectContiguousInnerJoins(Plan plan) {
+            if (!isSlotProject(plan) && !isInnerJoin(plan)) {
+                leaf.add(plan);
+                return;
+            }
+            for (Plan child : plan.children()) {
+                collectContiguousInnerJoins(child);
+            }
+            if (isInnerJoin(plan)) {
+                LogicalJoin<?, ?> join = (LogicalJoin<?, ?>) plan;
+                Set<Slot> inputSlots = join.getInputSlots();
+                BitSet childrenIndices = new BitSet();
+                List<Plan> children = new ArrayList<>();
+                for (int i = 0; i < leaf.size(); i++) {
+                    if (!Sets.intersection(leaf.get(i).getOutputSet(), 
inputSlots).isEmpty()) {
+                        childrenIndices.set(i);
+                        children.add(leaf.get(i));
+                    }
+                }
+                if (childrenIndices.cardinality() == 2) {
+                    join = join.withChildren(children);
+                }
+                innerJoins.put(childrenIndices, join);
+            }
+        }
+
+        boolean isValid() {
+            // we cannot handle the case that there is any join with more than 
2 children
+            return innerJoins.keySet().stream().allMatch(x -> x.cardinality() 
== 2);
+        }
+
+        @Nullable LogicalJoin<?, ?> constructJoinWithPrimary(BitSet bitSet, 
LogicalJoin<?, ?> join, Plan primary) {
+            Set<BitSet> forbiddenJoin = new HashSet<>();
+            forbiddenJoin.add(bitSet);
+            BitSet totalBitset = new BitSet();
+            totalBitset.set(0, leaf.size());
+            totalBitset.set(leaf.indexOf(primary), false);
+            Plan childPlan = constructPlan(totalBitset, forbiddenJoin);
+            if (childPlan == null) {
+                return null;
+            }
+            return (LogicalJoin<?, ?>) join.withChildren(childPlan, primary);
+        }
+
+        @Nullable Plan constructPlan(BitSet bitSet, Set<BitSet> forbiddenJoin) 
{
+            if (bitSet.cardinality() == 1) {
+                return leaf.get(bitSet.nextSetBit(0));
+            }
+
+            BitSet currentBitset = new BitSet();
+            Plan currentPlan = null;
+            while (!currentBitset.equals(bitSet)) {
+                boolean addJoin = false;
+                for (Entry<BitSet, LogicalJoin<?, ?>> entry : 
innerJoins.entrySet()) {
+                    if (forbiddenJoin.contains(entry.getKey())) {
+                        continue;
+                    }
+                    if (currentBitset.isEmpty()) {
+                        addJoin = true;
+                        currentBitset.or(entry.getKey());
+                        currentPlan = entry.getValue();
+                        forbiddenJoin.add(entry.getKey());
+                    } else if (currentBitset.intersects(entry.getKey())) {
+                        addJoin = true;
+                        currentBitset.or(entry.getKey());
+                        currentPlan = currentPlan.withChildren(currentPlan, 
entry.getValue());
+                        forbiddenJoin.add(entry.getKey());
+                    }
+                }
+                if (!addJoin) {
+                    // if we cannot find any join to add, just return null
+                    // It means we cannot construct a join
+                    return null;
+                }
+            }
+            return currentPlan;
+        }
+
+        Map<BitSet, LogicalJoin<?, ?>> getJoinsMap() {
+            return innerJoins;
+        }
+
+        boolean isSlotProject(Plan plan) {
+            return plan instanceof LogicalProject
+                    && ((LogicalProject<?>) (plan)).isAllSlots();
+
+        }
+
+        boolean isInnerJoin(Plan plan) {
+            return plan instanceof LogicalJoin
+                    && ((LogicalJoin<?, ?>) plan).getJoinType().isInnerJoin()
+                    && !((LogicalJoin<?, ?>) plan).isMarkJoin()
+                    && ((LogicalJoin<?, ?>) 
plan).getOtherJoinConjuncts().isEmpty();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFkTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFkTest.java
new file mode 100644
index 00000000000..91e66790002
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownAggThroughJoinOnPkFkTest.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.util.MemoPatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Test;
+
+class PushDownAggThroughJoinOnPkFkTest extends TestWithFeService implements 
MemoPatternMatchSupported {
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+        connectContext.setDatabase("default_cluster:test");
+        createTables(
+                "CREATE TABLE IF NOT EXISTS pri (\n"
+                        + "    id1 int not null,\n"
+                        + "    name char\n"
+                        + ")\n"
+                        + "DUPLICATE KEY(id1)\n"
+                        + "DISTRIBUTED BY HASH(id1) BUCKETS 10\n"
+                        + "PROPERTIES (\"replication_num\" = \"1\")\n",
+                "CREATE TABLE IF NOT EXISTS foreign_not_null (\n"
+                        + "    id2 int not null,\n"
+                        + "    name char\n"
+                        + ")\n"
+                        + "DUPLICATE KEY(id2)\n"
+                        + "DISTRIBUTED BY HASH(id2) BUCKETS 10\n"
+                        + "PROPERTIES (\"replication_num\" = \"1\")\n",
+                "CREATE TABLE IF NOT EXISTS foreign_null (\n"
+                        + "    id3 int,\n"
+                        + "    name char\n"
+                        + ")\n"
+                        + "DUPLICATE KEY(id3)\n"
+                        + "DISTRIBUTED BY HASH(id3) BUCKETS 10\n"
+                        + "PROPERTIES (\"replication_num\" = \"1\")\n"
+        );
+        addConstraint("Alter table pri add constraint pk primary key (id1)");
+        addConstraint("Alter table foreign_not_null add constraint f_not_null 
foreign key (id2)\n"
+                + "references pri(id1)");
+        addConstraint("Alter table foreign_null add constraint f_not_null 
foreign key (id3)\n"
+                + "references pri(id1)");
+        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+    }
+
+    @Test
+    void testGroupByFk() {
+        String sql = "select pri.id1 from pri inner join foreign_not_null on 
pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+    }
+
+    @Test
+    void testGroupByFkAndOther() {
+        String sql = "select pri.id1 from pri inner join foreign_not_null on 
pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1, 
foreign_not_null.name";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalProject(logicalAggregate()), 
any()))
+                .printlnTree();
+        sql = "select pri.id1 from pri inner join foreign_not_null on pri.id1 
= foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1, pri.name";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+        sql = "select pri.id1 from pri inner join foreign_not_null on pri.id1 
= foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1, pri.name, 
foreign_not_null.name";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalProject(logicalAggregate()), 
any()))
+                .printlnTree();
+    }
+
+    @Test
+    void testGroupByFkWithCount() {
+        String sql = "select count(pri.id1) from pri inner join 
foreign_not_null on pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+        sql = "select count(foreign_not_null.id2) from pri inner join 
foreign_not_null on pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+    }
+
+    @Test
+    void testGroupByFkWithForeigAgg() {
+        String sql = "select sum(foreign_not_null.id2) from pri inner join 
foreign_not_null on pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+    }
+
+    @Test
+    void testGroupByFkWithPrimaryAgg() {
+        String sql = "select sum(pri.id1) from pri inner join foreign_not_null 
on pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalAggregate(logicalProject(logicalJoin())))
+                .printlnTree();
+    }
+
+    @Test
+    void testMultiJoin() {
+        String sql = "select count(pri.id1), pri.name from foreign_not_null 
inner join foreign_null on foreign_null.name = foreign_not_null.name\n"
+                + " inner join pri on pri.id1 = foreign_not_null.id2\n"
+                + "group by foreign_not_null.id2, pri.id1, pri.name";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+
+        sql = "select count(pri.id1), pri.name from pri inner join 
foreign_not_null on pri.id1 = foreign_not_null.id2\n"
+                + "inner join foreign_null on foreign_null.name = 
foreign_not_null.name\n"
+                + "group by foreign_not_null.id2, pri.id1, pri.name";
+        PlanChecker.from(connectContext)
+                .analyze(sql)
+                .rewrite()
+                .matches(logicalJoin(logicalAggregate(), any()))
+                .printlnTree();
+    }
+}
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query38.out 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query38.out
index 442e184d622..6f065315ce7 100644
--- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query38.out
+++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query38.out
@@ -8,12 +8,12 @@ PhysicalResultSink
 ----------hashAgg[LOCAL]
 ------------PhysicalProject
 --------------PhysicalIntersect
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ws_bill_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ws_bill_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
 --------------------------------PhysicalProject
@@ -22,15 +22,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1200) and 
(date_dim.d_month_seq >= 1189))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 --------------------------------PhysicalProject
@@ -39,15 +38,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1200) and 
(date_dim.d_month_seq >= 1189))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ss_sold_date_sk]
 --------------------------------PhysicalProject
@@ -56,7 +54,6 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1200) and 
(date_dim.d_month_seq >= 1189))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query87.out 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query87.out
index 20ece13139d..181069d1e2c 100644
--- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query87.out
+++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query87.out
@@ -6,12 +6,12 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------PhysicalExcept
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ss_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk 
= customer.c_customer_sk)) otherCondition=() build RFs:RF1 
c_customer_sk->[ss_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ss_sold_date_sk]
 ----------------------------PhysicalProject
@@ -20,15 +20,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1213) and 
(date_dim.d_month_seq >= 1202))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 ----------------------------PhysicalProject
@@ -37,15 +36,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1213) and 
(date_dim.d_month_seq >= 1202))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ws_bill_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ws_bill_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ws_sold_date_sk]
 ----------------------------PhysicalProject
@@ -54,7 +52,6 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1213) and 
(date_dim.d_month_seq >= 1202))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query38.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query38.out
index c20ce4bb741..cae7a894987 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query38.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query38.out
@@ -8,12 +8,12 @@ PhysicalResultSink
 ----------hashAgg[LOCAL]
 ------------PhysicalProject
 --------------PhysicalIntersect
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
---------------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
 --------------------------------PhysicalProject
@@ -22,15 +22,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 --------------------------------PhysicalProject
@@ -39,15 +38,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=()
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ss_sold_date_sk]
 --------------------------------PhysicalProject
@@ -56,7 +54,6 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query87.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query87.out
index 2a1cbdc2655..9fb612c62ce 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query87.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query87.out
@@ -6,12 +6,12 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------PhysicalExcept
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=()
-----------------------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk 
= customer.c_customer_sk)) otherCondition=()
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ss_sold_date_sk]
 ----------------------------PhysicalProject
@@ -20,15 +20,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 ----------------------------PhysicalProject
@@ -37,15 +36,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ws_sold_date_sk]
 ----------------------------PhysicalProject
@@ -54,7 +52,6 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query38.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query38.out
index ff4380040ba..3a1334802da 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query38.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query38.out
@@ -8,12 +8,12 @@ PhysicalResultSink
 ----------hashAgg[LOCAL]
 ------------PhysicalProject
 --------------PhysicalIntersect
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ws_bill_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ws_bill_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
 --------------------------------PhysicalProject
@@ -22,15 +22,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 --------------------------------PhysicalProject
@@ -39,15 +38,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ss_sold_date_sk]
 --------------------------------PhysicalProject
@@ -56,7 +54,6 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query87.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query87.out
index abf84ad40ba..66f357bee2c 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query87.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query87.out
@@ -6,12 +6,12 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------PhysicalExcept
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ss_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk 
= customer.c_customer_sk)) otherCondition=() build RFs:RF1 
c_customer_sk->[ss_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ss_sold_date_sk]
 ----------------------------PhysicalProject
@@ -20,15 +20,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 ----------------------------PhysicalProject
@@ -37,15 +36,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ws_bill_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ws_bill_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ws_sold_date_sk]
 ----------------------------PhysicalProject
@@ -54,7 +52,6 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query38.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query38.out
index c20ce4bb741..cae7a894987 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query38.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query38.out
@@ -8,12 +8,12 @@ PhysicalResultSink
 ----------hashAgg[LOCAL]
 ------------PhysicalProject
 --------------PhysicalIntersect
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
---------------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
 --------------------------------PhysicalProject
@@ -22,15 +22,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 --------------------------------PhysicalProject
@@ -39,15 +38,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=()
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ss_sold_date_sk]
 --------------------------------PhysicalProject
@@ -56,7 +54,6 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query87.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query87.out
index 2a1cbdc2655..9fb612c62ce 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query87.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query87.out
@@ -6,12 +6,12 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------PhysicalExcept
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=()
-----------------------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk 
= customer.c_customer_sk)) otherCondition=()
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ss_sold_date_sk]
 ----------------------------PhysicalProject
@@ -20,15 +20,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 ----------------------------PhysicalProject
@@ -37,15 +36,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=()
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ws_sold_date_sk]
 ----------------------------PhysicalProject
@@ -54,7 +52,6 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query38.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query38.out
index ff4380040ba..3a1334802da 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query38.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query38.out
@@ -8,12 +8,12 @@ PhysicalResultSink
 ----------hashAgg[LOCAL]
 ------------PhysicalProject
 --------------PhysicalIntersect
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ws_bill_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ws_bill_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
 --------------------------------PhysicalProject
@@ -22,15 +22,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 --------------------------------PhysicalProject
@@ -39,15 +38,14 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
-----------------hashAgg[GLOBAL]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------hashAgg[LOCAL]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
---------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ss_customer_sk]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[GLOBAL]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------hashAgg[LOCAL]
 ----------------------------PhysicalProject
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ss_sold_date_sk]
 --------------------------------PhysicalProject
@@ -56,7 +54,6 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((date_dim.d_month_seq <= 1194) and 
(date_dim.d_month_seq >= 1183))
 --------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[customer]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalOlapScan[customer]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query87.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query87.out
index abf84ad40ba..66f357bee2c 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query87.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query87.out
@@ -6,12 +6,12 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------PhysicalExcept
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF1 c_customer_sk->[ss_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk 
= customer.c_customer_sk)) otherCondition=() build RFs:RF1 
c_customer_sk->[ss_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ss_sold_date_sk]
 ----------------------------PhysicalProject
@@ -20,15 +20,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF3 c_customer_sk->[cs_bill_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF2 d_date_sk->[cs_sold_date_sk]
 ----------------------------PhysicalProject
@@ -37,15 +36,14 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
-------------hashAgg[GLOBAL]
---------------PhysicalDistribute[DistributionSpecHash]
-----------------hashAgg[LOCAL]
-------------------PhysicalProject
---------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ws_bill_customer_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
+------------PhysicalDistribute[DistributionSpecHash]
+--------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF5 c_customer_sk->[ws_bill_customer_sk]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------hashAgg[GLOBAL]
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF4 d_date_sk->[ws_sold_date_sk]
 ----------------------------PhysicalProject
@@ -54,7 +52,6 @@ PhysicalResultSink
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_month_seq <= 1195) and 
(date_dim.d_month_seq >= 1184))
 ----------------------------------PhysicalOlapScan[date_dim]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer]
+----------------PhysicalDistribute[DistributionSpecHash]
+------------------PhysicalOlapScan[customer]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to