This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d3809a0e [feature](Nereids): Enable the costAndEnforcerJob (#11604)
89d3809a0e is described below

commit 89d3809a0e92b20fb96822a07acf23271a48245a
Author: jakevin <jakevin...@gmail.com>
AuthorDate: Wed Aug 10 15:17:15 2022 +0800

    [feature](Nereids): Enable the costAndEnforcerJob (#11604)
    
    1. Enable the costAndEnforcerJob
    2. Fix some bug of enforcer.
    3. polish property name and method
---
 .../org/apache/doris/nereids/NereidsPlanner.java   |   2 +-
 .../org/apache/doris/nereids/PlannerContext.java   |   2 +-
 .../apache/doris/nereids/cost/CostCalculator.java  |  10 ++-
 .../nereids/jobs/cascades/CostAndEnforcerJob.java  |  56 +++++-------
 .../apache/doris/nereids/memo/GroupExpression.java |   2 +-
 .../properties/ChildOutputPropertyDeriver.java     |  23 +++--
 .../doris/nereids/properties/DistributionSpec.java |   8 +-
 .../nereids/properties/DistributionSpecAny.java    |   6 +-
 .../nereids/properties/DistributionSpecGather.java |   2 +
 .../nereids/properties/DistributionSpecHash.java   |  11 ++-
 .../properties/DistributionSpecReplicated.java     |   3 +
 .../properties/EnforceMissingPropertiesHelper.java |   2 +-
 .../apache/doris/nereids/properties/OrderSpec.java |  13 ++-
 .../nereids/properties/PhysicalProperties.java     |  31 +++++--
 .../nereids/properties/RequestPropertyDeriver.java |  47 +++++-----
 .../LogicalOlapScanToPhysicalOlapScan.java         |  48 ++++++++--
 .../trees/plans/logical/LogicalOlapScan.java       |   8 ++
 .../trees/plans/physical/AbstractPhysicalPlan.java |   4 +-
 .../trees/plans/physical/PhysicalOlapScan.java     |  15 +++-
 .../org/apache/doris/nereids/util/JoinUtils.java   |   7 +-
 .../doris/nereids/jobs/CostAndEnforcerJobTest.java | 100 ++++++++++-----------
 .../nereids/jobs/cascades/DeriveStatsJobTest.java  |  13 +--
 .../rules/exploration/join/JoinCommuteTest.java    |   4 +-
 .../exploration/join/JoinLAsscomProjectTest.java   |   6 +-
 .../rules/exploration/join/JoinLAsscomTest.java    |   7 +-
 .../LogicalProjectToPhysicalProjectTest.java       |  20 +++--
 .../doris/nereids/stats/StatsCalculatorTest.java   |  15 ++--
 .../doris/nereids/trees/plans/PlanEqualsTest.java  |  13 ++-
 .../doris/nereids/trees/plans/PlanOutputTest.java  |   4 +-
 .../apache/doris/nereids/util/PlanConstructor.java |  90 +++++++++++++------
 30 files changed, 338 insertions(+), 234 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 0da54c06cd..3a486c1898 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -71,7 +71,7 @@ public class NereidsPlanner extends Planner {
         }
 
         LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) queryStmt;
-        PhysicalPlan physicalPlan = plan(logicalPlanAdapter.getLogicalPlan(), 
new PhysicalProperties(), ctx);
+        PhysicalPlan physicalPlan = plan(logicalPlanAdapter.getLogicalPlan(), 
PhysicalProperties.ANY, ctx);
 
         PhysicalPlanTranslator physicalPlanTranslator = new 
PhysicalPlanTranslator();
         PlanTranslatorContext planTranslatorContext = new 
PlanTranslatorContext();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerContext.java
index 7d623b85d7..0ee2c3dfe2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerContext.java
@@ -101,7 +101,7 @@ public class PlannerContext {
     }
 
     public PlannerContext setDefaultJobContext() {
-        this.currentJobContext = new JobContext(this, new 
PhysicalProperties(), Double.MAX_VALUE);
+        this.currentJobContext = new JobContext(this, PhysicalProperties.ANY, 
Double.MAX_VALUE);
         return this;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
index 6bf577cdf3..ccb99debde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostCalculator.java
@@ -44,10 +44,12 @@ public class CostCalculator {
      * Constructor.
      */
     public static double calculateCost(GroupExpression groupExpression) {
-        PlanContext planContext = new PlanContext(groupExpression);
-        CostEstimator costCalculator = new CostEstimator();
-        CostEstimate costEstimate = 
groupExpression.getPlan().accept(costCalculator, planContext);
-        return costFormula(costEstimate);
+        // TODO: Enable following code after enable stats derive.
+        // PlanContext planContext = new PlanContext(groupExpression);
+        // CostEstimator costCalculator = new CostEstimator();
+        // CostEstimate costEstimate = 
groupExpression.getPlan().accept(costCalculator, planContext);
+        // return costFormula(costEstimate);
+        return 0;
     }
 
     private static double costFormula(CostEstimate costEstimate) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index cb3f88cafe..f52dd4f2d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -47,12 +47,13 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
 
     // Children properties from parent plan node.
     // Example: Physical Hash Join
+    // [ child item: [leftProperties, rightPropertie]]
     // [ [Properties {"", ANY}, Properties {"", BROADCAST}],
     //   [Properties {"", SHUFFLE_JOIN}, Properties {"", SHUFFLE_JOIN}]]
-    private List<List<PhysicalProperties>> propertiesListList;
+    private List<List<PhysicalProperties>> requestChildrenPropertyList;
 
     private List<GroupExpression> childrenBestGroupExprList;
-    private List<PhysicalProperties> childrenOutputProperties = 
Lists.newArrayList();
+    private final List<PhysicalProperties> childrenOutputProperty = 
Lists.newArrayList();
 
     // Current stage of enumeration through child groups
     private int curChildIndex = -1;
@@ -66,19 +67,6 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
         this.groupExpression = groupExpression;
     }
 
-    @Override
-    public void execute() {
-        for (Group childGroup : groupExpression.children()) {
-            if (!childGroup.isHasCost()) {
-                // TODO: interim solution
-                pushTask(new CostAndEnforcerJob(this.groupExpression, 
context));
-                pushTask(new OptimizeGroupJob(childGroup, context));
-                childGroup.setHasCost(true);
-                return;
-            }
-        }
-    }
-
     /*-
      * Please read the ORCA paper
      * - 4.1.4 Optimization.
@@ -107,36 +95,37 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
     /**
      * execute.
      */
-    //    @Override
-    public void execute1() {
+    @Override
+    public void execute() {
         // Do init logic of root plan/groupExpr of `subplan`, only run once 
per task.
         if (curChildIndex == -1) {
             curTotalCost = 0;
 
             // Get property from groupExpression plan (it's root of subplan).
             RequestPropertyDeriver requestPropertyDeriver = new 
RequestPropertyDeriver(context);
-            propertiesListList = 
requestPropertyDeriver.getRequiredPropertyListList(groupExpression);
+            requestChildrenPropertyList = 
requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
 
             curChildIndex = 0;
         }
 
-        for (; curPropertyPairIndex < propertiesListList.size(); 
curPropertyPairIndex++) {
+        for (; curPropertyPairIndex < requestChildrenPropertyList.size(); 
curPropertyPairIndex++) {
             // children input properties
-            List<PhysicalProperties> childrenInputProperties = 
propertiesListList.get(curPropertyPairIndex);
+            List<PhysicalProperties> requestChildrenProperty = 
requestChildrenPropertyList.get(curPropertyPairIndex);
 
             // Calculate cost of groupExpression and update total cost
             if (curChildIndex == 0 && prevChildIndex == -1) {
                 curTotalCost += CostCalculator.calculateCost(groupExpression);
             }
 
+            // Handle all child plannode.
             for (; curChildIndex < groupExpression.arity(); curChildIndex++) {
-                PhysicalProperties childInputProperties = 
childrenInputProperties.get(curChildIndex);
+                PhysicalProperties requestChildProperty = 
requestChildrenProperty.get(curChildIndex);
                 Group childGroup = groupExpression.child(curChildIndex);
 
-                // Whether the child group was optimized for this 
childInputProperties according to
+                // Whether the child group was optimized for this 
requestChildProperty according to
                 // the result of returning.
                 Optional<Pair<Double, GroupExpression>> lowestCostPlanOpt = 
childGroup.getLowestCostPlan(
-                        childInputProperties);
+                        requestChildProperty);
 
                 if (!lowestCostPlanOpt.isPresent()) {
                     // The child should be pruned due to cost prune.
@@ -149,7 +138,7 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
                     prevChildIndex = curChildIndex;
                     pushTask((CostAndEnforcerJob) clone());
                     double newCostUpperBound = context.getCostUpperBound() - 
curTotalCost;
-                    JobContext jobContext = new 
JobContext(context.getPlannerContext(), childInputProperties,
+                    JobContext jobContext = new 
JobContext(context.getPlannerContext(), requestChildProperty,
                             newCostUpperBound);
                     pushTask(new OptimizeGroupJob(childGroup, jobContext));
                     return;
@@ -157,12 +146,12 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
 
                 GroupExpression lowestCostExpr = 
lowestCostPlanOpt.get().second;
 
-                PhysicalProperties childOutputProperty = 
lowestCostExpr.getPropertyFromMap(childInputProperties);
-                // TODO: maybe need to record children lowestCostExpr
-                childrenInputProperties.set(curChildIndex, 
childOutputProperty);
+                PhysicalProperties childOutputProperty = 
lowestCostExpr.getPropertyFromMap(requestChildProperty);
+                // add childOutputProperty of children into 
childrenOutputProperty
+                childrenOutputProperty.add(childOutputProperty);
+                requestChildrenProperty.set(curChildIndex, 
childOutputProperty);
 
-                // todo: check whether split agg broadcast row count limit.
-                curTotalCost += 
lowestCostExpr.getLowestCostTable().get(childInputProperties).first;
+                curTotalCost += 
lowestCostExpr.getLowestCostTable().get(requestChildProperty).first;
                 if (curTotalCost > context.getCostUpperBound()) {
                     break;
                 }
@@ -176,7 +165,7 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
                 // TODO: it could update the cost.
                 PhysicalProperties outputProperty = 
ChildOutputPropertyDeriver.getProperties(
                         context.getRequiredProperties(),
-                        childrenOutputProperties, groupExpression);
+                        childrenOutputProperty, groupExpression);
 
                 if (curTotalCost > context.getCostUpperBound()) {
                     break;
@@ -190,7 +179,7 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
                 // TODO: calculate stats. ??????
                 
groupExpression.getOwnerGroup().setStatistics(planContext.getStatistics());
 
-                enforce(outputProperty, childrenInputProperties);
+                enforce(outputProperty, requestChildrenProperty);
 
                 if (curTotalCost < context.getCostUpperBound()) {
                     context.setCostUpperBound(curTotalCost);
@@ -198,6 +187,7 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
             }
 
             // Reset child idx and total cost
+            childrenOutputProperty.clear();
             prevChildIndex = -1;
             curChildIndex = 0;
             curTotalCost = 0;
@@ -209,13 +199,13 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
         // groupExpression can satisfy its own output property
         putProperty(groupExpression, outputProperty, outputProperty, 
childrenInputProperties);
         // groupExpression can satisfy the ANY type output property
-        putProperty(groupExpression, outputProperty, new PhysicalProperties(), 
childrenInputProperties);
+        putProperty(groupExpression, outputProperty, PhysicalProperties.ANY, 
childrenInputProperties);
 
         EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new 
EnforceMissingPropertiesHelper(context,
                 groupExpression, curTotalCost);
 
         PhysicalProperties requestedProperties = 
context.getRequiredProperties();
-        if (!outputProperty.meet(requestedProperties)) {
+        if (!outputProperty.satisfy(requestedProperties)) {
             Pair<PhysicalProperties, Double> pair = 
enforceMissingPropertiesHelper.enforceProperty(outputProperty,
                     requestedProperties);
             PhysicalProperties addEnforcedProperty = pair.first;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index d7a1508ace..44f8d0f86c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -168,7 +168,7 @@ public class GroupExpression {
             return false;
         }
         GroupExpression that = (GroupExpression) o;
-        return children.equals(that.children) && plan.equals(that.plan);
+        return plan.equals(that.plan) && children.equals(that.children);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 3a55cf858b..1902411c27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -65,7 +65,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
 
     @Override
     public PhysicalProperties visit(Plan plan, PlanContext context) {
-        return new PhysicalProperties();
+        return PhysicalProperties.ANY;
     }
 
     @Override
@@ -75,26 +75,19 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
         PhysicalProperties rightOutputProperty = 
childrenOutputProperties.get(1);
 
         // broadcast
+        // TODO: handle condition of broadcast
         if (rightOutputProperty.getDistributionSpec() instanceof 
DistributionSpecReplicated) {
-            // TODO
             return leftOutputProperty;
         }
 
         // shuffle
-        // List<SlotReference> leftSlotRefs = 
hashJoin.left().getOutput().stream().map(slot -> (SlotReference) slot)
-        //        .collect(Collectors.toList());
-        // List<SlotReference> rightSlotRefs = 
hashJoin.right().getOutput().stream().map(slot -> (SlotReference) slot)
-        //                .collect(Collectors.toList());
-
-        //        List<SlotReference> leftOnSlotRefs;
-        //        List<SlotReference> rightOnSlotRefs;
-        //        Preconditions.checkState(leftOnSlotRefs.size() == 
rightOnSlotRefs.size());
+        // TODO: handle condition of shuffle
         DistributionSpec leftDistribution = 
leftOutputProperty.getDistributionSpec();
         DistributionSpec rightDistribution = 
rightOutputProperty.getDistributionSpec();
         if (!(leftDistribution instanceof DistributionSpecHash)
                 || !(rightDistribution instanceof DistributionSpecHash)) {
             Preconditions.checkState(false, "error");
-            return new PhysicalProperties();
+            return PhysicalProperties.ANY;
         }
 
         return leftOutputProperty;
@@ -128,7 +121,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
         if (!(leftDistribution instanceof DistributionSpecHash)
                 || !(rightDistribution instanceof DistributionSpecHash)) {
             Preconditions.checkState(false, "error");
-            return new PhysicalProperties();
+            return PhysicalProperties.ANY;
         }
 
         return leftOutputProperty;
@@ -136,6 +129,10 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
 
     @Override
     public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, 
PlanContext context) {
-        return olapScan.getPhysicalProperties();
+        if (olapScan.getDistributionSpec() instanceof DistributionSpecHash) {
+            return PhysicalProperties.createHash((DistributionSpecHash) 
olapScan.getDistributionSpec());
+        } else {
+            return PhysicalProperties.ANY;
+        }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
index d5568de293..250049e16d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpec.java
@@ -49,6 +49,10 @@ public abstract class DistributionSpec {
         return new GroupExpression(distribution, Lists.newArrayList(child));
     }
 
+    @Override
+    public String toString() {
+        return this.getClass().toString();
+    }
 
     @Override
     public boolean equals(Object o) {
@@ -62,7 +66,7 @@ public abstract class DistributionSpec {
     }
 
     @Override
-    public String toString() {
-        return this.getClass().toString();
+    public int hashCode() {
+        return 0;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
index 0457b8204b..0bc75ffce8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecAny.java
@@ -22,16 +22,12 @@ package org.apache.doris.nereids.properties;
  */
 public class DistributionSpecAny extends DistributionSpec {
 
-    private static DistributionSpecAny instance = new DistributionSpecAny();
+    public static final DistributionSpecAny INSTANCE = new 
DistributionSpecAny();
 
     private DistributionSpecAny() {
         super();
     }
 
-    public static DistributionSpecAny getInstance() {
-        return instance;
-    }
-
     @Override
     public boolean satisfy(DistributionSpec other) {
         return other instanceof DistributionSpecAny;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
index 709a582e0c..8bbb8fff2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecGather.java
@@ -22,6 +22,8 @@ package org.apache.doris.nereids.properties;
  */
 public class DistributionSpecGather extends DistributionSpec {
 
+    public static final DistributionSpecGather INSTANCE = new 
DistributionSpecGather();
+
     public DistributionSpecGather() {
         super();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
index fb7cfe22df..eeb58e1ba1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java
@@ -22,6 +22,7 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 
 
 /**
@@ -48,7 +49,6 @@ public class DistributionSpecHash extends DistributionSpec {
         return shuffleType;
     }
 
-
     @Override
     public boolean satisfy(DistributionSpec other) {
         if (other instanceof DistributionSpecAny) {
@@ -66,9 +66,9 @@ public class DistributionSpecHash extends DistributionSpec {
         }
 
         // TODO: need consider following logic whether is right, and maybe 
need consider more.
-
+        // TODO: consider Agg.
         // Current shuffleType is LOCAL/AGG, allow if current is contained by 
other
-        if (shuffleType == ShuffleType.LOCAL && spec.shuffleType == 
ShuffleType.AGG) {
+        if (shuffleType == ShuffleType.LOCAL || spec.shuffleType == 
ShuffleType.AGG) {
             return new 
HashSet<>(spec.shuffledColumns).containsAll(shuffledColumns);
         }
 
@@ -98,6 +98,11 @@ public class DistributionSpecHash extends DistributionSpec {
         // && propertyInfo.equals(that.propertyInfo)
     }
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(shuffledColumns, shuffleType);
+    }
+
     /**
      * Enums for concrete shuffle type.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
index 850dc11995..baae83d9ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecReplicated.java
@@ -22,6 +22,9 @@ package org.apache.doris.nereids.properties;
  * Like: broadcast join.
  */
 public class DistributionSpecReplicated extends DistributionSpec {
+
+    public static final DistributionSpecReplicated INSTANCE = new 
DistributionSpecReplicated();
+
     @Override
     public boolean satisfy(DistributionSpec other) {
         return other instanceof DistributionSpecReplicated || other instanceof 
DistributionSpecAny;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
index 73d23cbb27..aa7a94161e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java
@@ -46,7 +46,7 @@ public class EnforceMissingPropertiesHelper {
      * Enforce missing property.
      */
     public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties 
output, PhysicalProperties required) {
-        boolean isMeetOrder = 
output.getOrderSpec().meet(required.getOrderSpec());
+        boolean isMeetOrder = 
output.getOrderSpec().satisfy(required.getOrderSpec());
         boolean isMeetDistribution = 
output.getDistributionSpec().satisfy(required.getDistributionSpec());
 
         if (!isMeetDistribution && !isMeetOrder) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
index b0ef63329d..4816a28cdf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/OrderSpec.java
@@ -25,6 +25,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHeapSort;
 import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Spec of sort order.
@@ -45,7 +46,7 @@ public class OrderSpec {
      *
      * @param other another OrderSpec.
      */
-    public boolean meet(OrderSpec other) {
+    public boolean satisfy(OrderSpec other) {
         if (this.orderKeys.size() < other.getOrderKeys().size()) {
             return false;
         }
@@ -69,6 +70,11 @@ public class OrderSpec {
         return orderKeys;
     }
 
+    @Override
+    public String toString() {
+        return "Order: (" + orderKeys + ")";
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -80,4 +86,9 @@ public class OrderSpec {
         OrderSpec that = (OrderSpec) o;
         return orderKeys.equals(that.orderKeys);
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(orderKeys);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index feb448b33b..eb03239d95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -17,28 +17,36 @@
 
 package org.apache.doris.nereids.properties;
 
+import java.util.Objects;
+
 /**
  * Physical properties used in cascades.
- * TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage?
  */
 public class PhysicalProperties {
+
+    public static PhysicalProperties ANY = new PhysicalProperties();
+
+    public static PhysicalProperties REPLICATED = new 
PhysicalProperties(DistributionSpecReplicated.INSTANCE);
+
+    public static PhysicalProperties GATHER = new 
PhysicalProperties(DistributionSpecGather.INSTANCE);
+
     private final OrderSpec orderSpec;
 
     private final DistributionSpec distributionSpec;
 
-    public PhysicalProperties() {
+    private PhysicalProperties() {
         this.orderSpec = new OrderSpec();
-        this.distributionSpec = DistributionSpecAny.getInstance();
+        this.distributionSpec = DistributionSpecAny.INSTANCE;
     }
 
-    public PhysicalProperties(DistributionSpec distributionSpec) {
+    private PhysicalProperties(DistributionSpec distributionSpec) {
         this.distributionSpec = distributionSpec;
         this.orderSpec = new OrderSpec();
     }
 
     public PhysicalProperties(OrderSpec orderSpec) {
         this.orderSpec = orderSpec;
-        this.distributionSpec = DistributionSpecAny.getInstance();
+        this.distributionSpec = DistributionSpecAny.INSTANCE;
     }
 
     public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec 
orderSpec) {
@@ -46,9 +54,13 @@ public class PhysicalProperties {
         this.orderSpec = orderSpec;
     }
 
+    public static PhysicalProperties createHash(DistributionSpecHash 
distributionSpecHash) {
+        return new PhysicalProperties(distributionSpecHash);
+    }
+
     // Current properties satisfies other properties.
-    public boolean meet(PhysicalProperties other) {
-        return orderSpec.meet(other.orderSpec) && 
distributionSpec.satisfy(other.distributionSpec);
+    public boolean satisfy(PhysicalProperties other) {
+        return orderSpec.satisfy(other.orderSpec) && 
distributionSpec.satisfy(other.distributionSpec);
     }
 
     public OrderSpec getOrderSpec() {
@@ -71,4 +83,9 @@ public class PhysicalProperties {
         return orderSpec.equals(that.orderSpec)
                 && distributionSpec.equals(that.distributionSpec);
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(orderSpec, distributionSpec);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 112d0db3e6..31613a391b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -54,7 +54,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, 
PlanContext> {
         this.requestPropertyFromParent = context.getRequiredProperties();
     }
 
-    public List<List<PhysicalProperties>> 
getRequiredPropertyListList(GroupExpression groupExpression) {
+    public List<List<PhysicalProperties>> 
getRequestChildrenPropertyList(GroupExpression groupExpression) {
         requestPropertyToChildren = Lists.newArrayList();
         groupExpression.getPlan().accept(this, new 
PlanContext(groupExpression));
         return requestPropertyToChildren;
@@ -64,7 +64,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, 
PlanContext> {
     public Void visit(Plan plan, PlanContext context) {
         List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
         for (int i = 0; i < context.getGroupExpression().arity(); i++) {
-            requiredPropertyList.add(new PhysicalProperties());
+            requiredPropertyList.add(PhysicalProperties.ANY);
         }
         requestPropertyToChildren.add(requiredPropertyList);
         return null;
@@ -72,21 +72,22 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
 
     @Override
     public Void visitPhysicalHashJoin(PhysicalHashJoin<Plan, Plan> hashJoin, 
PlanContext context) {
-        // for broadcast join
-        List<PhysicalProperties> propertiesForBroadcast = Lists.newArrayList(
-                new PhysicalProperties(),
-                new PhysicalProperties(new DistributionSpecReplicated())
-        );
         // for shuffle join
-        Pair<List<SlotReference>, List<SlotReference>> onClauseUsedSlots = 
JoinUtils.getOnClauseUsedSlots(hashJoin);
-        List<PhysicalProperties> propertiesForShuffle = Lists.newArrayList(
-                new PhysicalProperties(new 
DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.JOIN)),
-                new PhysicalProperties(new 
DistributionSpecHash(onClauseUsedSlots.second, ShuffleType.JOIN)));
-
         if (!JoinUtils.onlyBroadcast(hashJoin)) {
+            Pair<List<SlotReference>, List<SlotReference>> onClauseUsedSlots = 
JoinUtils.getOnClauseUsedSlots(hashJoin);
+            List<PhysicalProperties> propertiesForShuffle = Lists.newArrayList(
+                    PhysicalProperties.createHash(new 
DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.JOIN)),
+                    PhysicalProperties.createHash(
+                            new DistributionSpecHash(onClauseUsedSlots.second, 
ShuffleType.JOIN)));
+
             requestPropertyToChildren.add(propertiesForShuffle);
         }
+        // for broadcast join
         if (!JoinUtils.onlyShuffle(hashJoin)) {
+            List<PhysicalProperties> propertiesForBroadcast = 
Lists.newArrayList(
+                    PhysicalProperties.ANY,
+                    PhysicalProperties.REPLICATED
+            );
             requestPropertyToChildren.add(propertiesForBroadcast);
         }
 
@@ -98,15 +99,15 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
         // TODO: copy from physicalHashJoin, should update according to 
physical nested loop join properties.
         // for broadcast join
         List<PhysicalProperties> propertiesForBroadcast = Lists.newArrayList(
-                new PhysicalProperties(),
-                new PhysicalProperties(new DistributionSpecReplicated())
+                PhysicalProperties.ANY,
+                PhysicalProperties.REPLICATED
         );
         // for shuffle join
         Pair<List<SlotReference>, List<SlotReference>> onClauseUsedSlots
                 = JoinUtils.getOnClauseUsedSlots(nestedLoopJoin);
         List<PhysicalProperties> propertiesForShuffle = Lists.newArrayList(
-                new PhysicalProperties(new 
DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.JOIN)),
-                new PhysicalProperties(new 
DistributionSpecHash(onClauseUsedSlots.second, ShuffleType.JOIN)));
+                PhysicalProperties.createHash(new 
DistributionSpecHash(onClauseUsedSlots.first, ShuffleType.JOIN)),
+                PhysicalProperties.createHash(new 
DistributionSpecHash(onClauseUsedSlots.second, ShuffleType.JOIN)));
 
         if (!JoinUtils.onlyBroadcast(nestedLoopJoin)) {
             requestPropertyToChildren.add(propertiesForShuffle);
@@ -127,8 +128,8 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
                 && ((DistributionSpecHash) 
requestedProperty.getDistributionSpec()).getShuffleType()
                 == ShuffleType.JOIN)) {
             return Lists.newArrayList(
-                    new PhysicalProperties(new 
DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
-                    new PhysicalProperties(new 
DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+                    PhysicalProperties.createHash(new 
DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+                    PhysicalProperties.createHash(new 
DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
         }
 
         // adjust the required property shuffle columns based on the column 
order required by parent
@@ -139,15 +140,13 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
         boolean adjustBasedOnRight = 
Utils.equalsIgnoreOrder(rightShuffleColumns, requestedColumns);
         if (!adjustBasedOnLeft && !adjustBasedOnRight) {
             return Lists.newArrayList(
-                    new PhysicalProperties(new 
DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
-                    new PhysicalProperties(new 
DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
+                    PhysicalProperties.createHash(new 
DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+                    PhysicalProperties.createHash(new 
DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
         }
 
         return Lists.newArrayList(
-                new PhysicalProperties(new 
DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
-                new PhysicalProperties(new 
DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
-
+                PhysicalProperties.createHash(new 
DistributionSpecHash(leftShuffleColumns, ShuffleType.JOIN)),
+                PhysicalProperties.createHash(new 
DistributionSpecHash(rightShuffleColumns, ShuffleType.JOIN)));
     }
-
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index d785949ff0..304a37c623 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -17,11 +17,23 @@
 
 package org.apache.doris.nereids.rules.implementation;
 
-import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.nereids.properties.DistributionSpec;
+import org.apache.doris.nereids.properties.DistributionSpecAny;
+import org.apache.doris.nereids.properties.DistributionSpecHash;
+import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.util.Utils;
 
+import com.google.common.collect.Lists;
+
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -31,12 +43,34 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
     @Override
     public Rule build() {
         return logicalOlapScan().then(olapScan ->
-                // TODO: olapScan should get (OlapTable);
-                new PhysicalOlapScan(
-                    (OlapTable) olapScan.getTable(),
-                    olapScan.getQualifier(),
-                    Optional.empty(),
-                    olapScan.getLogicalProperties())
+            new PhysicalOlapScan(
+                olapScan.getTable(),
+                olapScan.getQualifier(),
+                convertDistribution(olapScan),
+                Optional.empty(),
+                olapScan.getLogicalProperties())
         ).toRule(RuleType.LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE);
     }
+
+    private DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
+        DistributionInfo distributionInfo = 
olapScan.getTable().getDefaultDistributionInfo();
+        if (distributionInfo instanceof HashDistributionInfo) {
+            HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) 
distributionInfo;
+
+            List<SlotReference> output = 
Utils.getOutputSlotReference(olapScan);
+            List<SlotReference> hashColumns = Lists.newArrayList();
+            List<Column> schemaColumns = olapScan.getTable().getFullSchema();
+            for (int i = 0; i < schemaColumns.size(); i++) {
+                for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    if (schemaColumns.get(i).equals(column)) {
+                        hashColumns.add(output.get(i));
+                    }
+                }
+            }
+            return new DistributionSpecHash(hashColumns, ShuffleType.LOCAL);
+        } else {
+            // RandomDistributionInfo
+            return DistributionSpecAny.INSTANCE;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
index e688106d1d..c071a84ea1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.trees.plans.logical;
 
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
@@ -24,6 +25,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
@@ -55,6 +57,12 @@ public class LogicalOlapScan extends LogicalRelation  {
         super(PlanType.LOGICAL_OLAP_SCAN, table, qualifier, groupExpression, 
logicalProperties);
     }
 
+    @Override
+    public OlapTable getTable() {
+        Preconditions.checkArgument(table instanceof OlapTable);
+        return (OlapTable) table;
+    }
+
     @Override
     public String toString() {
         return "ScanOlapTable ("
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
index f60c8b6591..4b2349fbca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java
@@ -41,7 +41,7 @@ public abstract class AbstractPhysicalPlan extends 
AbstractPlan implements Physi
     public AbstractPhysicalPlan(PlanType type, LogicalProperties 
logicalProperties, Plan... children) {
         super(type, Optional.empty(), Optional.of(logicalProperties), 
children);
         // TODO: compute physical properties
-        this.physicalProperties = new PhysicalProperties();
+        this.physicalProperties = PhysicalProperties.ANY;
     }
 
     /**
@@ -56,7 +56,7 @@ public abstract class AbstractPhysicalPlan extends 
AbstractPlan implements Physi
                                 LogicalProperties logicalProperties, Plan... 
children) {
         super(type, groupExpression, Optional.of(logicalProperties), children);
         // TODO: compute physical properties
-        this.physicalProperties = new PhysicalProperties();
+        this.physicalProperties = PhysicalProperties.ANY;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index c2206961a1..03ed6d9677 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.physical;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DistributionSpec;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
@@ -39,8 +40,9 @@ public class PhysicalOlapScan extends PhysicalRelation {
     private final long selectedIndexId;
     private final List<Long> selectedTabletId;
     private final List<Long> selectedPartitionId;
-
     private final OlapTable olapTable;
+    private final DistributionSpec distributionSpec;
+
 
     /**
      * Constructor for PhysicalOlapScan.
@@ -48,13 +50,14 @@ public class PhysicalOlapScan extends PhysicalRelation {
      * @param olapTable OlapTable in Doris
      * @param qualifier qualifier of table name
      */
-    public PhysicalOlapScan(OlapTable olapTable, List<String> qualifier,
+    public PhysicalOlapScan(OlapTable olapTable, List<String> qualifier, 
DistributionSpec distributionSpec,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties) {
         super(PlanType.PHYSICAL_OLAP_SCAN, qualifier, groupExpression, 
logicalProperties);
         this.olapTable = olapTable;
         this.selectedIndexId = olapTable.getBaseIndexId();
         this.selectedTabletId = Lists.newArrayList();
         this.selectedPartitionId = olapTable.getPartitionIds();
+        this.distributionSpec = distributionSpec;
         for (Partition partition : olapTable.getAllPartitions()) {
             
selectedTabletId.addAll(partition.getBaseIndex().getTabletIdsInOrder());
         }
@@ -76,6 +79,10 @@ public class PhysicalOlapScan extends PhysicalRelation {
         return olapTable;
     }
 
+    public DistributionSpec getDistributionSpec() {
+        return distributionSpec;
+    }
+
     @Override
     public String toString() {
         return "PhysicalOlapScan (["
@@ -110,11 +117,11 @@ public class PhysicalOlapScan extends PhysicalRelation {
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new PhysicalOlapScan(olapTable, qualifier, groupExpression, 
logicalProperties);
+        return new PhysicalOlapScan(olapTable, qualifier, distributionSpec, 
groupExpression, logicalProperties);
     }
 
     @Override
     public Plan withLogicalProperties(Optional<LogicalProperties> 
logicalProperties) {
-        return new PhysicalOlapScan(olapTable, qualifier, Optional.empty(), 
logicalProperties.get());
+        return new PhysicalOlapScan(olapTable, qualifier, distributionSpec, 
Optional.empty(), logicalProperties.get());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 23417ae1d3..1246838c05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Utils for join
@@ -80,7 +81,10 @@ public class JoinUtils {
             return false;
         }
 
-        return Utils.equalsIgnoreOrder(leftUsed, leftSlots) || 
Utils.equalsIgnoreOrder(rightUsed, rightSlots);
+        Set<SlotReference> leftSlotsSet = new HashSet<>(leftSlots);
+        Set<SlotReference> rightSlotsSet = new HashSet<>(rightSlots);
+        return (leftSlotsSet.containsAll(leftUsed) && 
rightSlotsSet.containsAll(rightUsed))
+                || (leftSlotsSet.containsAll(rightUsed) && 
rightSlotsSet.containsAll(leftUsed));
     }
 
     /**
@@ -114,6 +118,7 @@ public class JoinUtils {
             }
         }
 
+        Preconditions.checkState(childSlots.first.size() == 
childSlots.second.size());
         return childSlots;
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
index d96f42b054..5a523ff142 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/CostAndEnforcerJobTest.java
@@ -17,35 +17,29 @@
 
 package org.apache.doris.nereids.jobs;
 
-import org.apache.doris.catalog.AggregateType;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Type;
 import org.apache.doris.nereids.PlannerContext;
 import org.apache.doris.nereids.cost.CostCalculator;
 import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.memo.Memo;
-import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.JoinType;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
-import org.apache.doris.nereids.types.IntegerType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.util.PlanConstructor;
 import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import mockit.Mock;
 import mockit.MockUp;
-import mockit.Mocked;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class CostAndEnforcerJobTest {
     /*
@@ -55,8 +49,33 @@ public class CostAndEnforcerJobTest {
      *        /    \
      *       A      B
      */
+
+    private static List<LogicalOlapScan> scans = Lists.newArrayList();
+    private static List<List<SlotReference>> outputs = Lists.newArrayList();
+
+    @BeforeAll
+    public static void init() {
+        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "a", 0);
+        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "b", 1);
+        LogicalOlapScan scan3 = PlanConstructor.newLogicalOlapScan(2, "c", 0);
+
+        scans.add(scan1);
+        scans.add(scan2);
+        scans.add(scan3);
+
+        List<SlotReference> t1Output = scan1.getOutput().stream().map(slot -> 
(SlotReference) slot)
+                .collect(Collectors.toList());
+        List<SlotReference> t2Output = scan2.getOutput().stream().map(slot -> 
(SlotReference) slot)
+                .collect(Collectors.toList());
+        List<SlotReference> t3Output = scan3.getOutput().stream().map(slot -> 
(SlotReference) slot)
+                .collect(Collectors.toList());
+        outputs.add(t1Output);
+        outputs.add(t2Output);
+        outputs.add(t3Output);
+    }
+
     @Test
-    public void testExecute(@Mocked LogicalProperties logicalProperties) {
+    public void testExecute() {
         new MockUp<CostCalculator>() {
             @Mock
             public double calculateCost(GroupExpression groupExpression) {
@@ -64,51 +83,22 @@ public class CostAndEnforcerJobTest {
             }
         };
 
-        OlapTable aOlapTable = new OlapTable(0L, "a",
-                ImmutableList.of(new Column("id", Type.INT, true, 
AggregateType.NONE, "0", ""),
-                        new Column("name", Type.STRING, true, 
AggregateType.NONE, "", "")),
-                KeysType.PRIMARY_KEYS,
-                null, null);
-        OlapTable bOlapTable = new OlapTable(0L, "b",
-                ImmutableList.of(new Column("id", Type.INT, true, 
AggregateType.NONE, "0", ""),
-                        new Column("name", Type.STRING, true, 
AggregateType.NONE, "", "")),
-                KeysType.PRIMARY_KEYS,
-                null, null);
-        PhysicalOlapScan aScan = new PhysicalOlapScan(aOlapTable, 
Lists.newArrayList("a"), Optional.empty(),
-                logicalProperties);
-        PhysicalOlapScan bScan = new PhysicalOlapScan(bOlapTable, 
Lists.newArrayList("b"), Optional.empty(),
-                logicalProperties);
+        /*
+         *   bottomJoin
+         *    /    \
+         *   A      B
+         */
+        Expression bottomJoinOnCondition = new EqualTo(outputs.get(0).get(0), 
outputs.get(1).get(0));
 
+        LogicalJoin<LogicalOlapScan, LogicalOlapScan> bottomJoin = new 
LogicalJoin<>(JoinType.INNER_JOIN,
+                Optional.of(bottomJoinOnCondition), scans.get(0), 
scans.get(1));
 
-        OlapTable cOlapTable = new OlapTable(0L, "c",
-                ImmutableList.of(new Column("id", Type.INT, true, 
AggregateType.NONE, "0", ""),
-                        new Column("name", Type.STRING, true, 
AggregateType.NONE, "", "")),
-                KeysType.PRIMARY_KEYS,
-                null, null);
-        PhysicalPlan cScan = new PhysicalOlapScan(cOlapTable, 
Lists.newArrayList("c"), Optional.empty(),
-                logicalProperties);
-
-        Expression bottomJoinOnCondition = new EqualTo(
-                new SlotReference("id", new IntegerType(), true, 
ImmutableList.of("a")),
-                new SlotReference("id", new IntegerType(), true, 
ImmutableList.of("b")));
-        Expression topJoinOnCondition = new EqualTo(
-                new SlotReference("id", new IntegerType(), true, 
ImmutableList.of("a")),
-                new SlotReference("id", new IntegerType(), true, 
ImmutableList.of("c")));
-
-        PhysicalHashJoin bottomJoin = new 
PhysicalHashJoin<>(JoinType.INNER_JOIN,
-                Optional.of(bottomJoinOnCondition),
-                logicalProperties, aScan, bScan);
-        PhysicalHashJoin topJoin = new PhysicalHashJoin<>(JoinType.INNER_JOIN,
-                Optional.of(topJoinOnCondition),
-                logicalProperties, cScan, bottomJoin);
-
-
-        PlannerContext plannerContext = new 
Memo(topJoin).newPlannerContext(new ConnectContext())
+        PlannerContext plannerContext = new 
Memo(bottomJoin).newPlannerContext(new ConnectContext())
                 .setDefaultJobContext();
-
-        OptimizeGroupJob optimizeGroupJob = new 
OptimizeGroupJob(plannerContext.getMemo().getRoot(),
-                plannerContext.getCurrentJobContext());
-        plannerContext.pushJob(optimizeGroupJob);
+        plannerContext.pushJob(
+                new OptimizeGroupJob(
+                        plannerContext.getMemo().getRoot(),
+                        plannerContext.getCurrentJobContext()));
         plannerContext.getJobScheduler().executeJobPool(plannerContext);
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJobTest.java
index 4fc8c703aa..915ef31470 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJobTest.java
@@ -18,14 +18,13 @@
 package org.apache.doris.nereids.jobs.cascades;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.nereids.PlannerContext;
 import org.apache.doris.nereids.jobs.JobContext;
 import org.apache.doris.nereids.memo.Memo;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction;
 import org.apache.doris.nereids.trees.expressions.functions.Sum;
@@ -41,7 +40,6 @@ import org.apache.doris.statistics.StatisticsManager;
 import org.apache.doris.statistics.StatsDeriveResult;
 import org.apache.doris.statistics.TableStats;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import mockit.Expectations;
 import mockit.Mocked;
@@ -102,14 +100,9 @@ public class DeriveStatsJobTest {
                 result = statistics;
             }};
 
-        Table table1 = PlanConstructor.newTable(tableId1, "t1");
+        OlapTable table1 = PlanConstructor.newOlapTable(tableId1, "t1", 0);
         return new LogicalOlapScan(table1, 
Collections.emptyList()).withLogicalProperties(
-                Optional.of(new LogicalProperties(new Supplier<List<Slot>>() {
-                    @Override
-                    public List<Slot> get() {
-                        return Collections.singletonList(slot1);
-                    }
-                })));
+                Optional.of(new LogicalProperties(() -> 
ImmutableList.of(slot1))));
     }
 
     private LogicalAggregate constructAgg(Plan child) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
index 14151ce829..d7a5e50abf 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
@@ -40,8 +40,8 @@ import java.util.Optional;
 public class JoinCommuteTest {
     @Test
     public void testInnerJoinCommute(@Mocked PlannerContext plannerContext) {
-        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan("t2");
-        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan("t2");
+        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
 
         Expression onCondition = new EqualTo(
                 new SlotReference("id", new BigIntType(), true, 
ImmutableList.of("table1")),
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomProjectTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomProjectTest.java
index eda887fe28..3ff1633d7c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomProjectTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomProjectTest.java
@@ -50,9 +50,9 @@ public class JoinLAsscomProjectTest {
 
     @BeforeAll
     public static void init() {
-        LogicalOlapScan scan1 = 
PlanConstructor.newLogicalOlapScanWithTable("t1");
-        LogicalOlapScan scan2 = 
PlanConstructor.newLogicalOlapScanWithTable("t2");
-        LogicalOlapScan scan3 = 
PlanConstructor.newLogicalOlapScanWithTable("t3");
+        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
+        LogicalOlapScan scan3 = PlanConstructor.newLogicalOlapScan(2, "t3", 0);
 
         scans.add(scan1);
         scans.add(scan2);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomTest.java
index 6d7984db5d..c15c27130e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinLAsscomTest.java
@@ -46,9 +46,10 @@ public class JoinLAsscomTest {
 
     @BeforeAll
     public static void init() {
-        LogicalOlapScan scan1 = 
PlanConstructor.newLogicalOlapScanWithTable("t1");
-        LogicalOlapScan scan2 = 
PlanConstructor.newLogicalOlapScanWithTable("t2");
-        LogicalOlapScan scan3 = 
PlanConstructor.newLogicalOlapScanWithTable("t3");
+        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
+        LogicalOlapScan scan3 = PlanConstructor.newLogicalOlapScan(2, "t3", 0);
+
         scans.add(scan1);
         scans.add(scan2);
         scans.add(scan3);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
index a16f9d5ef1..b5f5aa0db4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/LogicalProjectToPhysicalProjectTest.java
@@ -43,7 +43,7 @@ import java.util.List;
 import java.util.Map;
 
 public class LogicalProjectToPhysicalProjectTest {
-    private final Map<String, Rule> rulesMap
+    private static final Map<String, Rule> rulesMap
             = ImmutableMap.<String, Rule>builder()
             .put(LogicalProject.class.getName(), (new 
LogicalProjectToPhysicalProject()).build())
             .put(LogicalAggregate.class.getName(), (new 
LogicalAggToPhysicalHashAgg()).build())
@@ -53,7 +53,7 @@ public class LogicalProjectToPhysicalProjectTest {
             .put(LogicalSort.class.getName(), (new 
LogicalSortToPhysicalHeapSort()).build())
             .build();
 
-    private PhysicalPlan rewriteLogicalToPhysical(Group group, PlannerContext 
plannerContext) {
+    private static PhysicalPlan rewriteLogicalToPhysical(Group group, 
PlannerContext plannerContext) {
         List<Plan> children = Lists.newArrayList();
         for (Group child : group.getLogicalExpression().children()) {
             children.add(rewriteLogicalToPhysical(child, plannerContext));
@@ -68,16 +68,20 @@ public class LogicalProjectToPhysicalProjectTest {
         return (PhysicalPlan) implPlanNode.withChildren(children);
     }
 
+    public static PhysicalPlan rewriteLogicalToPhysical(LogicalPlan plan) {
+        PlannerContext plannerContext = new Memo(plan)
+                .newPlannerContext(new ConnectContext())
+                .setDefaultJobContext();
+
+        return rewriteLogicalToPhysical(plannerContext.getMemo().getRoot(), 
plannerContext);
+    }
+
     @Test
     public void projectionImplTest() {
-        LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan("a");
+        LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "a", 0);
         LogicalPlan project = new LogicalProject<>(Lists.newArrayList(), scan);
 
-        PlannerContext plannerContext = new Memo(project)
-                .newPlannerContext(new ConnectContext())
-                .setDefaultJobContext();
-
-        PhysicalPlan physicalProject = 
rewriteLogicalToPhysical(plannerContext.getMemo().getRoot(), plannerContext);
+        PhysicalPlan physicalProject = rewriteLogicalToPhysical(project);
         Assertions.assertEquals(PlanType.PHYSICAL_PROJECT, 
physicalProject.getType());
         PhysicalPlan physicalScan = (PhysicalPlan) physicalProject.child(0);
         Assertions.assertEquals(PlanType.PHYSICAL_OLAP_SCAN, 
physicalScan.getType());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
index a22d0f27d2..a6ccb7c7b7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
@@ -18,7 +18,7 @@
 package org.apache.doris.nereids.stats;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.nereids.memo.Group;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
@@ -224,16 +224,11 @@ public class StatsCalculatorTest {
                 result = statistics;
             }};
 
-        Table table1 = PlanConstructor.newTable(tableId1, "t1");
+        OlapTable table1 = PlanConstructor.newOlapTable(tableId1, "t1", 0);
         LogicalOlapScan logicalOlapScan1 = new LogicalOlapScan(table1, 
Collections.emptyList()).withLogicalProperties(
-                Optional.of(new LogicalProperties(new Supplier<List<Slot>>() {
-                    @Override
-                    public List<Slot> get() {
-                        return Arrays.asList(slot1);
-                    }
-                })));
+                Optional.of(new LogicalProperties(() -> 
ImmutableList.of(slot1))));
         Group childGroup = new Group();
-        GroupExpression groupExpression = new 
GroupExpression(logicalOlapScan1, Arrays.asList(childGroup));
+        GroupExpression groupExpression = new 
GroupExpression(logicalOlapScan1, ImmutableList.of(childGroup));
         Group ownerGroup = new Group();
         groupExpression.setOwnerGroup(ownerGroup);
         StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
@@ -266,7 +261,7 @@ public class StatsCalculatorTest {
         GroupPlan groupPlan = new GroupPlan(childGroup);
         childGroup.setStatistics(childStats);
 
-        LogicalLimit logicalLimit = new LogicalLimit(1, 2, groupPlan);
+        LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(1, 2, 
groupPlan);
         GroupExpression groupExpression = new GroupExpression(logicalLimit);
         groupExpression.addChild(childGroup);
         Group ownerGroup = new Group();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
index 2f760214fa..d851435540 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids.trees.plans;
 
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.properties.DistributionSpecHash;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
@@ -110,8 +111,8 @@ public class PlanEqualsTest {
 
     @Test
     public void testLogicalOlapScan() {
-        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan("table");
-        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan("table");
+        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "table", 
0);
+        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(0, "table", 
0);
 
         Assertions.assertEquals(scan1, scan2);
     }
@@ -182,10 +183,14 @@ public class PlanEqualsTest {
     }
 
     @Test
-    public void testPhysicalOlapScan(@Mocked LogicalProperties 
logicalProperties, @Mocked OlapTable olapTable) {
+    public void testPhysicalOlapScan(
+            @Mocked LogicalProperties logicalProperties,
+            @Mocked OlapTable olapTable,
+            @Mocked DistributionSpecHash distributionSpecHash) {
         List<String> qualifier = Lists.newArrayList();
 
-        PhysicalOlapScan olapScan = new PhysicalOlapScan(olapTable, qualifier, 
Optional.empty(), logicalProperties);
+        PhysicalOlapScan olapScan = new PhysicalOlapScan(olapTable, qualifier, 
distributionSpecHash, Optional.empty(),
+                logicalProperties);
 
         Assertions.assertEquals(olapScan, olapScan);
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanOutputTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanOutputTest.java
index f52a3ea005..9bfff12def 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanOutputTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanOutputTest.java
@@ -39,7 +39,7 @@ import java.util.Optional;
 public class PlanOutputTest {
     @Test
     public void testComputeOutput() {
-        LogicalOlapScan relationPlan = 
PlanConstructor.newLogicalOlapScanWithTable("a");
+        LogicalOlapScan relationPlan = PlanConstructor.newLogicalOlapScan(0, 
"a", 0);
         List<Slot> output = relationPlan.getOutput();
         Assertions.assertEquals(2, output.size());
         Assertions.assertEquals(output.get(0).getName(), "id");
@@ -67,7 +67,7 @@ public class PlanOutputTest {
 
     @Test
     public void testWithOutput() {
-        LogicalOlapScan relationPlan = 
PlanConstructor.newLogicalOlapScanWithTable("a");
+        LogicalOlapScan relationPlan = PlanConstructor.newLogicalOlapScan(0, 
"a", 0);
 
         List<Slot> output = relationPlan.getOutput();
         // column prune
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java
index 3beed0fbc5..e8b7c193aa 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java
@@ -19,37 +19,77 @@ package org.apache.doris.nereids.util;
 
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.thrift.TStorageType;
 
 import com.google.common.collect.ImmutableList;
 
+import java.util.List;
+
 public class PlanConstructor {
-    public static Table student = new Table(0L, "student", 
Table.TableType.OLAP,
-            ImmutableList.<Column>of(new Column("id", Type.INT, true, 
AggregateType.NONE, "0", ""),
-                    new Column("gender", Type.INT, false, AggregateType.NONE, 
"0", ""),
-                    new Column("name", Type.STRING, true, AggregateType.NONE, 
"", ""),
-                    new Column("age", Type.INT, true, AggregateType.NONE, "", 
"")));
+    public static OlapTable student;
+    public static OlapTable score;
+    public static OlapTable course;
+
+    static {
+        student = new OlapTable(0L, "student",
+                ImmutableList.<Column>of(new Column("id", Type.INT, true, 
AggregateType.NONE, "0", ""),
+                        new Column("gender", Type.INT, false, 
AggregateType.NONE, "0", ""),
+                        new Column("name", Type.STRING, true, 
AggregateType.NONE, "", ""),
+                        new Column("age", Type.INT, true, AggregateType.NONE, 
"", "")),
+                KeysType.PRIMARY_KEYS, null, null);
+        score = new OlapTable(0L, "course",
+                ImmutableList.<Column>of(new Column("sid", Type.INT, true, 
AggregateType.NONE, "0", ""),
+                        new Column("cid", Type.INT, true, AggregateType.NONE, 
"", ""),
+                        new Column("grade", Type.DOUBLE, true, 
AggregateType.NONE, "", "")),
+                KeysType.PRIMARY_KEYS, null, null);
+        course = new OlapTable(0L, "course",
+                ImmutableList.<Column>of(new Column("cid", Type.INT, true, 
AggregateType.NONE, "0", ""),
+                        new Column("name", Type.STRING, true, 
AggregateType.NONE, "", ""),
+                        new Column("teacher", Type.STRING, true, 
AggregateType.NONE, "", "")),
+                KeysType.PRIMARY_KEYS, null, null);
+        student.setIndexMeta(-1,
+                "base",
+                student.getFullSchema(),
+                0, 0, (short) 0,
+                TStorageType.COLUMN,
+                KeysType.PRIMARY_KEYS);
+        course.setIndexMeta(-1,
+                "base",
+                course.getFullSchema(),
+                0, 0, (short) 0,
+                TStorageType.COLUMN,
+                KeysType.PRIMARY_KEYS);
+        score.setIndexMeta(-1,
+                "base",
+                score.getFullSchema(),
+                0, 0, (short) 0,
+                TStorageType.COLUMN,
+                KeysType.PRIMARY_KEYS);
+    }
 
-    public static Table score = new Table(0L, "score", Table.TableType.OLAP,
-            ImmutableList.<Column>of(new Column("sid", Type.INT, true, 
AggregateType.NONE, "0", ""),
-                    new Column("cid", Type.INT, true, AggregateType.NONE, "", 
""),
-                    new Column("grade", Type.DOUBLE, true, AggregateType.NONE, 
"", "")));
+    public static OlapTable newOlapTable(long tableId, String tableName, int 
hashColumn) {
+        List<Column> columns = ImmutableList.of(
+                new Column("id", Type.INT, true, AggregateType.NONE, "0", ""),
+                new Column("name", Type.STRING, true, AggregateType.NONE, "", 
""));
 
-    public static Table course = new Table(0L, "course", Table.TableType.OLAP,
-            ImmutableList.<Column>of(new Column("cid", Type.INT, true, 
AggregateType.NONE, "0", ""),
-                    new Column("name", Type.STRING, true, AggregateType.NONE, 
"", ""),
-                    new Column("teacher", Type.STRING, true, 
AggregateType.NONE, "", "")));
+        HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(3,
+                ImmutableList.of(columns.get(hashColumn)));
 
-    public static OlapTable newOlapTable(long tableId, String tableName) {
-        return new OlapTable(0L, tableName,
-                ImmutableList.of(
-                        new Column("id", Type.INT, true, AggregateType.NONE, 
"0", ""),
-                        new Column("name", Type.STRING, true, 
AggregateType.NONE, "", "")),
-                KeysType.PRIMARY_KEYS, null, null);
+        OlapTable table = new OlapTable(tableId, tableName, columns,
+                KeysType.PRIMARY_KEYS, null, hashDistributionInfo);
+        table.setIndexMeta(-1,
+                "base",
+                table.getFullSchema(),
+                0, 0, (short) 0,
+                TStorageType.COLUMN,
+                KeysType.PRIMARY_KEYS);
+        return table;
     }
 
     public static Table newTable(long tableId, String tableName) {
@@ -60,13 +100,9 @@ public class PlanConstructor {
                 ));
     }
 
-    // With OlapTable
-    public static LogicalOlapScan newLogicalOlapScan(String tableName) {
-        return new LogicalOlapScan(newOlapTable(0L, tableName), 
ImmutableList.of("db"));
-    }
-
-    // With Table
-    public static LogicalOlapScan newLogicalOlapScanWithTable(String 
tableName) {
-        return new LogicalOlapScan(newTable(0L, tableName), 
ImmutableList.of("db"));
+    // With OlapTable.
+    // Warning: equals() of Table depends on tableId.
+    public static LogicalOlapScan newLogicalOlapScan(long tableId, String 
tableName, int hashColumn) {
+        return new LogicalOlapScan(newOlapTable(tableId, tableName, 
hashColumn), ImmutableList.of("db"));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to