This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpch500
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpch500 by this push:
     new 2e99c426a2b Expand rf prune on tpch sf500 (#29357)
2e99c426a2b is described below

commit 2e99c426a2b365e43b66b77b4ab6ca6991e3624a
Author: minghong <engle...@gmail.com>
AuthorDate: Sun Dec 31 23:49:49 2023 +0800

    Expand rf prune on tpch sf500 (#29357)
---
 .../processor/post/RuntimeFilterContext.java       | 62 +++++++++++++++++++--
 .../processor/post/RuntimeFilterPruner.java        | 64 +++++++++++++++++-----
 .../post/RuntimeFilterPrunerForExternalTable.java  | 27 +++++----
 .../plans/physical/PhysicalCatalogRelation.java    |  1 +
 .../trees/plans/physical/PhysicalHashJoin.java     | 16 +++++-
 5 files changed, 140 insertions(+), 30 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index b7858d42768..e123e04a32f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.common.IdGenerator;
 import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.expressions.EqualPredicate;
 import org.apache.doris.nereids.trees.expressions.EqualTo;
 import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
@@ -125,7 +126,7 @@ public class RuntimeFilterContext {
 
     private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = 
Maps.newHashMap();
 
-    private final Set<Plan> effectiveSrcNodes = Sets.newHashSet();
+    private final Map<Plan, EffectiveSrcType> effectiveSrcNodes = 
Maps.newHashMap();
 
     // cte to related joins map which can extract common runtime filter to cte 
inside
     private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = 
Maps.newHashMap();
@@ -147,6 +148,30 @@ public class RuntimeFilterContext {
 
     private int targetNullCount = 0;
 
+    private final List<ExpandRF> expandedRF = Lists.newArrayList();
+
+    /**
+     * info about expand rf by inner join
+     */
+    public static class ExpandRF {
+        public AbstractPhysicalJoin buildNode;
+
+        public PhysicalRelation srcNode;
+        public PhysicalRelation target1;
+
+        public PhysicalRelation target2;
+
+        public EqualPredicate equal;
+
+        public ExpandRF(AbstractPhysicalJoin buildNode, PhysicalRelation 
srcNode,
+                        PhysicalRelation target1, PhysicalRelation target2, 
EqualPredicate equal) {
+            this.buildNode = buildNode;
+            this.srcNode = srcNode;
+            this.target1 = target1;
+            this.target2 = target2;
+        }
+    }
+
     public RuntimeFilterContext(SessionVariable sessionVariable) {
         this.sessionVariable = sessionVariable;
         this.limits = new FilterSizeLimits(sessionVariable);
@@ -291,12 +316,23 @@ public class RuntimeFilterContext {
         targetNullCount++;
     }
 
-    public void addEffectiveSrcNode(Plan node) {
-        effectiveSrcNodes.add(node);
+    /**
+     * the selectivity produced by predicate or rf
+     */
+    public enum EffectiveSrcType {
+        NATIVE, REF
+    }
+
+    public void addEffectiveSrcNode(Plan node, EffectiveSrcType type) {
+        effectiveSrcNodes.put(node, type);
     }
 
     public boolean isEffectiveSrcNode(Plan node) {
-        return effectiveSrcNodes.contains(node);
+        return effectiveSrcNodes.keySet().contains(node);
+    }
+
+    public EffectiveSrcType getEffectiveSrcType(Plan plan) {
+        return effectiveSrcNodes.get(plan);
     }
 
     @VisibleForTesting
@@ -319,4 +355,22 @@ public class RuntimeFilterContext {
         }
         return olapSlot;
     }
+
+    /**
+     * return the info about expand_runtime_filter_by_inner_join
+     */
+    public ExpandRF getExpandRfByJoin(AbstractPhysicalJoin join) {
+        if (join instanceof PhysicalHashJoin) {
+            for (ExpandRF expand : expandedRF) {
+                if (expand.buildNode.equals(join)) {
+                    return expand;
+                }
+            }
+        }
+        return null;
+    }
+
+    public List<ExpandRF> getExpandedRF() {
+        return expandedRF;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
index 210ed4f6f32..5005da2a1cf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
@@ -32,9 +32,12 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.Statistics;
 
+import com.google.common.collect.Sets;
+
 import java.util.List;
 import java.util.Set;
 
@@ -57,7 +60,9 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
         if (!plan.children().isEmpty()) {
             plan.child(0).accept(this, context);
             if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(plan.child(0))) {
-                context.getRuntimeFilterContext().addEffectiveSrcNode(plan);
+                RuntimeFilterContext.EffectiveSrcType childType = 
context.getRuntimeFilterContext()
+                        .getEffectiveSrcType(plan.child(0));
+                context.getRuntimeFilterContext().addEffectiveSrcNode(plan, 
childType);
             }
         }
         return plan;
@@ -66,13 +71,13 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
     @Override
     public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
CascadesContext context) {
         topN.child().accept(this, context);
-        context.getRuntimeFilterContext().addEffectiveSrcNode(topN);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(topN, 
RuntimeFilterContext.EffectiveSrcType.NATIVE);
         return topN;
     }
 
     public PhysicalLimit visitPhysicalLimit(PhysicalLimit<? extends Plan> 
limit, CascadesContext context) {
         limit.child().accept(this, context);
-        context.getRuntimeFilterContext().addEffectiveSrcNode(limit);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(limit, 
RuntimeFilterContext.EffectiveSrcType.NATIVE);
         return limit;
     }
 
@@ -80,11 +85,29 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
     public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends 
Plan, ? extends Plan> join,
             CascadesContext context) {
         join.right().accept(this, context);
-        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(join.right())) {
-            context.getRuntimeFilterContext().addEffectiveSrcNode(join);
+        RuntimeFilterContext rfContext = context.getRuntimeFilterContext();
+        if (rfContext.isEffectiveSrcNode(join.right())) {
+            boolean enableExpand = false;
+            if (ConnectContext.get() != null) {
+                enableExpand = 
ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin;
+            }
+            if (enableExpand && rfContext.getEffectiveSrcType(join.right())
+                    == RuntimeFilterContext.EffectiveSrcType.REF) {
+                RuntimeFilterContext.ExpandRF expand = 
rfContext.getExpandRfByJoin(join);
+                if (expand != null) {
+                    Set<ExprId> outputExprIdOfExpandTargets = 
Sets.newHashSet();
+                    
outputExprIdOfExpandTargets.addAll(expand.target1.getOutputExprIds());
+                    
outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds());
+                    rfContext.getTargetExprIdByFilterJoin(join)
+                            .stream().filter(exprId -> 
outputExprIdOfExpandTargets.contains(exprId))
+                            .forEach(exprId -> rfContext.removeFilter(exprId, 
join));
+                }
+            }
+            RuntimeFilterContext.EffectiveSrcType childType =
+                    rfContext.getEffectiveSrcType(join.right());
+            context.getRuntimeFilterContext().addEffectiveSrcNode(join, 
childType);
         } else {
-            RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-            List<ExprId> exprIds = ctx.getTargetExprIdByFilterJoin(join);
+            List<ExprId> exprIds = rfContext.getTargetExprIdByFilterJoin(join);
             if (exprIds != null && !exprIds.isEmpty()) {
                 boolean isEffective = false;
                 for (Expression expr : join.getEqualToConjuncts()) {
@@ -93,13 +116,21 @@ public class RuntimeFilterPruner extends PlanPostProcessor 
{
                     }
                 }
                 if (!isEffective) {
-                    exprIds.stream().forEach(exprId -> 
context.getRuntimeFilterContext().removeFilter(exprId, join));
+                    exprIds.stream().forEach(exprId -> 
rfContext.removeFilter(exprId, join));
                 }
             }
         }
         join.left().accept(this, context);
-        if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.left())) 
{
-            context.getRuntimeFilterContext().addEffectiveSrcNode(join);
+        if (rfContext.isEffectiveSrcNode(join.left())) {
+            RuntimeFilterContext.EffectiveSrcType leftType =
+                    rfContext.getEffectiveSrcType(join.left());
+            RuntimeFilterContext.EffectiveSrcType rightType =
+                    rfContext.getEffectiveSrcType(join.right());
+            if (rightType == null
+                    || (rightType == RuntimeFilterContext.EffectiveSrcType.REF
+                        && leftType == 
RuntimeFilterContext.EffectiveSrcType.NATIVE)) {
+                rfContext.addEffectiveSrcNode(join, leftType);
+            }
         }
         return join;
     }
@@ -122,7 +153,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
                 .anyMatch(slot -> isVisibleColumn(slot));
         if (visibleFilter) {
             // skip filters like: __DORIS_DELETE_SIGN__ = 0
-            context.getRuntimeFilterContext().addEffectiveSrcNode(filter);
+            context.getRuntimeFilterContext().addEffectiveSrcNode(filter, 
RuntimeFilterContext.EffectiveSrcType.NATIVE);
         }
         return filter;
     }
@@ -134,7 +165,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
         for (Slot slot : slots) {
             //if this scan node is the target of any effective RF, it is 
effective source
             if 
(!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) {
-                context.getRuntimeFilterContext().addEffectiveSrcNode(scan);
+                context.getRuntimeFilterContext().addEffectiveSrcNode(scan, 
RuntimeFilterContext.EffectiveSrcType.REF);
                 break;
             }
         }
@@ -145,20 +176,23 @@ public class RuntimeFilterPruner extends 
PlanPostProcessor {
     public PhysicalAssertNumRows 
visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
             CascadesContext context) {
         assertNumRows.child().accept(this, context);
-        context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows);
+        context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows,
+                RuntimeFilterContext.EffectiveSrcType.NATIVE);
         return assertNumRows;
     }
 
     @Override
     public PhysicalHashAggregate 
visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> aggregate,
                                                             CascadesContext 
context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
         aggregate.child().accept(this, context);
         // q1: A join (select x, sum(y) as z from B group by x) T on A.a = T.x
         // q2: A join (select x, sum(y) as z from B group by x) T on A.a = T.z
         // RF on q1 is not effective, but RF on q2 is. But q1 is a more 
generous pattern, and hence agg is not
         // regarded as an effective source. Let this RF judge by ndv.
-        if 
(context.getRuntimeFilterContext().isEffectiveSrcNode(aggregate.child(0))) {
-            context.getRuntimeFilterContext().addEffectiveSrcNode(aggregate);
+        if (ctx.isEffectiveSrcNode(aggregate.child(0))) {
+            RuntimeFilterContext.EffectiveSrcType childType = 
ctx.getEffectiveSrcType(aggregate.child());
+            ctx.addEffectiveSrcNode(aggregate, childType);
         }
         return aggregate;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
index dd104173b21..0a0cfe04ec6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
@@ -97,7 +97,8 @@ public class RuntimeFilterPrunerForExternalTable extends 
PlanPostProcessor {
                                                   CascadesContext context) {
         join.right().accept(this, context);
         join.right().setMutableState(MutableState.KEY_PARENT, join);
-        join.setMutableState(MutableState.KEY_RF_JUMP, 
join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
+        join.setMutableState(MutableState.KEY_RF_JUMP,
+                join.right().getMutableState(MutableState.KEY_RF_JUMP).get());
         join.left().accept(this, context);
         join.left().setMutableState(MutableState.KEY_PARENT, join);
         return join;
@@ -121,15 +122,18 @@ public class RuntimeFilterPrunerForExternalTable extends 
PlanPostProcessor {
         Plan cursor = scan;
         Optional<Plan> parent = 
cursor.getMutableState(MutableState.KEY_PARENT);
         while (parent.isPresent()) {
-            if (joinAndAncestors.contains(parent.get())) {
-                Optional oi = 
parent.get().getMutableState(MutableState.KEY_RF_JUMP);
-                if (oi.isPresent() && ConnectContext.get() != null
-                        && (int) (oi.get()) > 
ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
-                    return true;
-                }
-            } else {
-                if (isBuildSide(parent.get(), cursor)) {
-                    return false;
+            if (parent.get() instanceof Join) {
+                if (joinAndAncestors.contains(parent.get())) {
+                    Optional oi = 
parent.get().getMutableState(MutableState.KEY_RF_JUMP);
+                    if (oi.isPresent() && ConnectContext.get() != null
+                            && (int) (oi.get())
+                            > 
ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) {
+                        return true;
+                    }
+                } else {
+                    if (isBuildSide(parent.get(), cursor)) {
+                        return false;
+                    }
                 }
             }
             cursor = parent.get();
@@ -148,6 +152,9 @@ public class RuntimeFilterPrunerForExternalTable extends 
PlanPostProcessor {
             Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP);
             if (oi.isPresent()) {
                 int jump = (Integer) (oi.get());
+                if (child instanceof Join) {
+                    jump++;
+                }
                 if (jump > maxJump) {
                     maxJump = jump;
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
index 5537b4dd7c6..060075deb6c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java
@@ -138,6 +138,7 @@ public abstract class PhysicalCatalogRelation extends 
PhysicalRelation implement
             getAppliedRuntimeFilters()
                     .stream().forEach(rf -> shapeBuilder.append(" 
RF").append(rf.getId().asInt()));
         }
+        //shapeBuilder.append("jump: 
").append(getMutableState(MutableState.KEY_RF_JUMP));
         return shapeBuilder.toString();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 39462be71ef..2fbb3356ae1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -210,17 +210,31 @@ public class PhysicalHashJoin<
                 "join child node is null");
 
         Set<Expression> probExprList = Sets.newHashSet(probeExpr);
+        Pair<PhysicalRelation, Slot> pair = 
ctx.getAliasTransferMap().get(probeExpr);
+        PhysicalRelation target1 = (pair == null) ? null : pair.first;
+        PhysicalRelation target2 = null;
+        pair = ctx.getAliasTransferMap().get(srcExpr);
+        PhysicalRelation srcNode = (pair == null) ? null : pair.first;
         if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin) {
             if (!this.equals(builderNode) && this.getJoinType() == 
JoinType.INNER_JOIN) {
                 for (Expression expr : this.getHashJoinConjuncts()) {
                     EqualPredicate equalTo = (EqualPredicate) expr;
                     if (probeExpr.equals(equalTo.left())) {
                         probExprList.add(equalTo.right());
+                        pair = ctx.getAliasTransferMap().get(equalTo.right());
+                        target2 = (pair == null) ? null : pair.first;
                     } else if (probeExpr.equals(equalTo.right())) {
                         probExprList.add(equalTo.left());
+                        pair = ctx.getAliasTransferMap().get(equalTo.left());
+                        target2 = (pair == null) ? null : pair.first;
+                    }
+                    if (target2 != null) {
+                        ctx.getExpandedRF().add(
+                            new RuntimeFilterContext.ExpandRF(this, srcNode, 
target1, target2, equalTo));
                     }
                 }
                 probExprList.remove(srcExpr);
+
             }
         }
         for (Expression prob : probExprList) {
@@ -260,7 +274,7 @@ public class PhysicalHashJoin<
             builder.append(" build RFs:").append(runtimeFilters.stream()
                     .map(rf -> 
rf.shapeInfo()).collect(Collectors.joining(";")));
         }
-        // builder.append("jump: 
").append(getMutableState(MutableState.KEY_RF_JUMP));
+        builder.append(" jump: 
").append(getMutableState(MutableState.KEY_RF_JUMP));
         return builder.toString();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to