This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpch500 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpch500 by this push: new 2e99c426a2b Expand rf prune on tpch sf500 (#29357) 2e99c426a2b is described below commit 2e99c426a2b365e43b66b77b4ab6ca6991e3624a Author: minghong <engle...@gmail.com> AuthorDate: Sun Dec 31 23:49:49 2023 +0800 Expand rf prune on tpch sf500 (#29357) --- .../processor/post/RuntimeFilterContext.java | 62 +++++++++++++++++++-- .../processor/post/RuntimeFilterPruner.java | 64 +++++++++++++++++----- .../post/RuntimeFilterPrunerForExternalTable.java | 27 +++++---- .../plans/physical/PhysicalCatalogRelation.java | 1 + .../trees/plans/physical/PhysicalHashJoin.java | 16 +++++- 5 files changed, 140 insertions(+), 30 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index b7858d42768..e123e04a32f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; import org.apache.doris.nereids.trees.expressions.CTEId; +import org.apache.doris.nereids.trees.expressions.EqualPredicate; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; @@ -125,7 +126,7 @@ public class RuntimeFilterContext { private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = Maps.newHashMap(); - private final Set<Plan> effectiveSrcNodes = Sets.newHashSet(); + private final Map<Plan, EffectiveSrcType> effectiveSrcNodes = Maps.newHashMap(); // cte to related joins map which can extract common runtime filter to cte inside private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newHashMap(); @@ -147,6 +148,30 @@ public class RuntimeFilterContext { private int targetNullCount = 0; + private final List<ExpandRF> expandedRF = Lists.newArrayList(); + + /** + * info about expand rf by inner join + */ + public static class ExpandRF { + public AbstractPhysicalJoin buildNode; + + public PhysicalRelation srcNode; + public PhysicalRelation target1; + + public PhysicalRelation target2; + + public EqualPredicate equal; + + public ExpandRF(AbstractPhysicalJoin buildNode, PhysicalRelation srcNode, + PhysicalRelation target1, PhysicalRelation target2, EqualPredicate equal) { + this.buildNode = buildNode; + this.srcNode = srcNode; + this.target1 = target1; + this.target2 = target2; + } + } + public RuntimeFilterContext(SessionVariable sessionVariable) { this.sessionVariable = sessionVariable; this.limits = new FilterSizeLimits(sessionVariable); @@ -291,12 +316,23 @@ public class RuntimeFilterContext { targetNullCount++; } - public void addEffectiveSrcNode(Plan node) { - effectiveSrcNodes.add(node); + /** + * the selectivity produced by predicate or rf + */ + public enum EffectiveSrcType { + NATIVE, REF + } + + public void addEffectiveSrcNode(Plan node, EffectiveSrcType type) { + effectiveSrcNodes.put(node, type); } public boolean isEffectiveSrcNode(Plan node) { - return effectiveSrcNodes.contains(node); + return effectiveSrcNodes.keySet().contains(node); + } + + public EffectiveSrcType getEffectiveSrcType(Plan plan) { + return effectiveSrcNodes.get(plan); } @VisibleForTesting @@ -319,4 +355,22 @@ public class RuntimeFilterContext { } return olapSlot; } + + /** + * return the info about expand_runtime_filter_by_inner_join + */ + public ExpandRF getExpandRfByJoin(AbstractPhysicalJoin join) { + if (join instanceof PhysicalHashJoin) { + for (ExpandRF expand : expandedRF) { + if (expand.buildNode.equals(join)) { + return expand; + } + } + } + return null; + } + + public List<ExpandRF> getExpandedRF() { + return expandedRF; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java index 210ed4f6f32..5005da2a1cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java @@ -32,9 +32,12 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Statistics; +import com.google.common.collect.Sets; + import java.util.List; import java.util.Set; @@ -57,7 +60,9 @@ public class RuntimeFilterPruner extends PlanPostProcessor { if (!plan.children().isEmpty()) { plan.child(0).accept(this, context); if (context.getRuntimeFilterContext().isEffectiveSrcNode(plan.child(0))) { - context.getRuntimeFilterContext().addEffectiveSrcNode(plan); + RuntimeFilterContext.EffectiveSrcType childType = context.getRuntimeFilterContext() + .getEffectiveSrcType(plan.child(0)); + context.getRuntimeFilterContext().addEffectiveSrcNode(plan, childType); } } return plan; @@ -66,13 +71,13 @@ public class RuntimeFilterPruner extends PlanPostProcessor { @Override public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext context) { topN.child().accept(this, context); - context.getRuntimeFilterContext().addEffectiveSrcNode(topN); + context.getRuntimeFilterContext().addEffectiveSrcNode(topN, RuntimeFilterContext.EffectiveSrcType.NATIVE); return topN; } public PhysicalLimit visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, CascadesContext context) { limit.child().accept(this, context); - context.getRuntimeFilterContext().addEffectiveSrcNode(limit); + context.getRuntimeFilterContext().addEffectiveSrcNode(limit, RuntimeFilterContext.EffectiveSrcType.NATIVE); return limit; } @@ -80,11 +85,29 @@ public class RuntimeFilterPruner extends PlanPostProcessor { public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context) { join.right().accept(this, context); - if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.right())) { - context.getRuntimeFilterContext().addEffectiveSrcNode(join); + RuntimeFilterContext rfContext = context.getRuntimeFilterContext(); + if (rfContext.isEffectiveSrcNode(join.right())) { + boolean enableExpand = false; + if (ConnectContext.get() != null) { + enableExpand = ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin; + } + if (enableExpand && rfContext.getEffectiveSrcType(join.right()) + == RuntimeFilterContext.EffectiveSrcType.REF) { + RuntimeFilterContext.ExpandRF expand = rfContext.getExpandRfByJoin(join); + if (expand != null) { + Set<ExprId> outputExprIdOfExpandTargets = Sets.newHashSet(); + outputExprIdOfExpandTargets.addAll(expand.target1.getOutputExprIds()); + outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds()); + rfContext.getTargetExprIdByFilterJoin(join) + .stream().filter(exprId -> outputExprIdOfExpandTargets.contains(exprId)) + .forEach(exprId -> rfContext.removeFilter(exprId, join)); + } + } + RuntimeFilterContext.EffectiveSrcType childType = + rfContext.getEffectiveSrcType(join.right()); + context.getRuntimeFilterContext().addEffectiveSrcNode(join, childType); } else { - RuntimeFilterContext ctx = context.getRuntimeFilterContext(); - List<ExprId> exprIds = ctx.getTargetExprIdByFilterJoin(join); + List<ExprId> exprIds = rfContext.getTargetExprIdByFilterJoin(join); if (exprIds != null && !exprIds.isEmpty()) { boolean isEffective = false; for (Expression expr : join.getEqualToConjuncts()) { @@ -93,13 +116,21 @@ public class RuntimeFilterPruner extends PlanPostProcessor { } } if (!isEffective) { - exprIds.stream().forEach(exprId -> context.getRuntimeFilterContext().removeFilter(exprId, join)); + exprIds.stream().forEach(exprId -> rfContext.removeFilter(exprId, join)); } } } join.left().accept(this, context); - if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.left())) { - context.getRuntimeFilterContext().addEffectiveSrcNode(join); + if (rfContext.isEffectiveSrcNode(join.left())) { + RuntimeFilterContext.EffectiveSrcType leftType = + rfContext.getEffectiveSrcType(join.left()); + RuntimeFilterContext.EffectiveSrcType rightType = + rfContext.getEffectiveSrcType(join.right()); + if (rightType == null + || (rightType == RuntimeFilterContext.EffectiveSrcType.REF + && leftType == RuntimeFilterContext.EffectiveSrcType.NATIVE)) { + rfContext.addEffectiveSrcNode(join, leftType); + } } return join; } @@ -122,7 +153,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor { .anyMatch(slot -> isVisibleColumn(slot)); if (visibleFilter) { // skip filters like: __DORIS_DELETE_SIGN__ = 0 - context.getRuntimeFilterContext().addEffectiveSrcNode(filter); + context.getRuntimeFilterContext().addEffectiveSrcNode(filter, RuntimeFilterContext.EffectiveSrcType.NATIVE); } return filter; } @@ -134,7 +165,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor { for (Slot slot : slots) { //if this scan node is the target of any effective RF, it is effective source if (!rfCtx.getTargetExprIdToFilter().get(slot.getExprId()).isEmpty()) { - context.getRuntimeFilterContext().addEffectiveSrcNode(scan); + context.getRuntimeFilterContext().addEffectiveSrcNode(scan, RuntimeFilterContext.EffectiveSrcType.REF); break; } } @@ -145,20 +176,23 @@ public class RuntimeFilterPruner extends PlanPostProcessor { public PhysicalAssertNumRows visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows, CascadesContext context) { assertNumRows.child().accept(this, context); - context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows); + context.getRuntimeFilterContext().addEffectiveSrcNode(assertNumRows, + RuntimeFilterContext.EffectiveSrcType.NATIVE); return assertNumRows; } @Override public PhysicalHashAggregate visitPhysicalHashAggregate(PhysicalHashAggregate<? extends Plan> aggregate, CascadesContext context) { + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); aggregate.child().accept(this, context); // q1: A join (select x, sum(y) as z from B group by x) T on A.a = T.x // q2: A join (select x, sum(y) as z from B group by x) T on A.a = T.z // RF on q1 is not effective, but RF on q2 is. But q1 is a more generous pattern, and hence agg is not // regarded as an effective source. Let this RF judge by ndv. - if (context.getRuntimeFilterContext().isEffectiveSrcNode(aggregate.child(0))) { - context.getRuntimeFilterContext().addEffectiveSrcNode(aggregate); + if (ctx.isEffectiveSrcNode(aggregate.child(0))) { + RuntimeFilterContext.EffectiveSrcType childType = ctx.getEffectiveSrcType(aggregate.child()); + ctx.addEffectiveSrcNode(aggregate, childType); } return aggregate; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java index dd104173b21..0a0cfe04ec6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java @@ -97,7 +97,8 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor { CascadesContext context) { join.right().accept(this, context); join.right().setMutableState(MutableState.KEY_PARENT, join); - join.setMutableState(MutableState.KEY_RF_JUMP, join.right().getMutableState(MutableState.KEY_RF_JUMP).get()); + join.setMutableState(MutableState.KEY_RF_JUMP, + join.right().getMutableState(MutableState.KEY_RF_JUMP).get()); join.left().accept(this, context); join.left().setMutableState(MutableState.KEY_PARENT, join); return join; @@ -121,15 +122,18 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor { Plan cursor = scan; Optional<Plan> parent = cursor.getMutableState(MutableState.KEY_PARENT); while (parent.isPresent()) { - if (joinAndAncestors.contains(parent.get())) { - Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP); - if (oi.isPresent() && ConnectContext.get() != null - && (int) (oi.get()) > ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) { - return true; - } - } else { - if (isBuildSide(parent.get(), cursor)) { - return false; + if (parent.get() instanceof Join) { + if (joinAndAncestors.contains(parent.get())) { + Optional oi = parent.get().getMutableState(MutableState.KEY_RF_JUMP); + if (oi.isPresent() && ConnectContext.get() != null + && (int) (oi.get()) + > ConnectContext.get().getSessionVariable().runtimeFilterJumpThreshold) { + return true; + } + } else { + if (isBuildSide(parent.get(), cursor)) { + return false; + } } } cursor = parent.get(); @@ -148,6 +152,9 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor { Optional oi = child.getMutableState(MutableState.KEY_RF_JUMP); if (oi.isPresent()) { int jump = (Integer) (oi.get()); + if (child instanceof Join) { + jump++; + } if (jump > maxJump) { maxJump = jump; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java index 5537b4dd7c6..060075deb6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCatalogRelation.java @@ -138,6 +138,7 @@ public abstract class PhysicalCatalogRelation extends PhysicalRelation implement getAppliedRuntimeFilters() .stream().forEach(rf -> shapeBuilder.append(" RF").append(rf.getId().asInt())); } + //shapeBuilder.append("jump: ").append(getMutableState(MutableState.KEY_RF_JUMP)); return shapeBuilder.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java index 39462be71ef..2fbb3356ae1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java @@ -210,17 +210,31 @@ public class PhysicalHashJoin< "join child node is null"); Set<Expression> probExprList = Sets.newHashSet(probeExpr); + Pair<PhysicalRelation, Slot> pair = ctx.getAliasTransferMap().get(probeExpr); + PhysicalRelation target1 = (pair == null) ? null : pair.first; + PhysicalRelation target2 = null; + pair = ctx.getAliasTransferMap().get(srcExpr); + PhysicalRelation srcNode = (pair == null) ? null : pair.first; if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin) { if (!this.equals(builderNode) && this.getJoinType() == JoinType.INNER_JOIN) { for (Expression expr : this.getHashJoinConjuncts()) { EqualPredicate equalTo = (EqualPredicate) expr; if (probeExpr.equals(equalTo.left())) { probExprList.add(equalTo.right()); + pair = ctx.getAliasTransferMap().get(equalTo.right()); + target2 = (pair == null) ? null : pair.first; } else if (probeExpr.equals(equalTo.right())) { probExprList.add(equalTo.left()); + pair = ctx.getAliasTransferMap().get(equalTo.left()); + target2 = (pair == null) ? null : pair.first; + } + if (target2 != null) { + ctx.getExpandedRF().add( + new RuntimeFilterContext.ExpandRF(this, srcNode, target1, target2, equalTo)); } } probExprList.remove(srcExpr); + } } for (Expression prob : probExprList) { @@ -260,7 +274,7 @@ public class PhysicalHashJoin< builder.append(" build RFs:").append(runtimeFilters.stream() .map(rf -> rf.shapeInfo()).collect(Collectors.joining(";"))); } - // builder.append("jump: ").append(getMutableState(MutableState.KEY_RF_JUMP)); + builder.append(" jump: ").append(getMutableState(MutableState.KEY_RF_JUMP)); return builder.toString(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org