This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4822b9811a [feature](nereids)support bitmap runtime filter on nereids (#16927) 4822b9811a is described below commit 4822b9811a32b5014f8f363e4f6027a23e6bca0c Author: minghong <engle...@gmail.com> AuthorDate: Thu Mar 9 09:30:24 2023 +0800 [feature](nereids)support bitmap runtime filter on nereids (#16927) * A in(B) -> bitmap_contains(bitmap_union(B), A) support bitmap runtime filter on nereids * GroupPlan -> Plan * fmt * fix target cast problem remove test code --- .../glue/translator/PhysicalPlanTranslator.java | 28 ++++-- .../glue/translator/RuntimeFilterTranslator.java | 65 ++++++++++-- .../processor/post/RuntimeFilterContext.java | 9 +- .../processor/post/RuntimeFilterGenerator.java | 58 +++++++++++ .../rules/exploration/join/JoinCommute.java | 26 +++++ .../rules/rewrite/logical/InApplyToJoin.java | 66 ++++++++++-- .../nereids/trees/plans/logical/LogicalFilter.java | 1 - .../plans/physical/PhysicalNestedLoopJoin.java | 21 ++++ .../trees/plans/physical/RuntimeFilter.java | 31 +++++- .../org/apache/doris/nereids/util/JoinUtils.java | 35 +++++++ .../org/apache/doris/planner/RuntimeFilter.java | 2 +- .../java/org/apache/doris/qe/SessionVariable.java | 5 + .../query_p0/join/test_bitmap_filter_nereids.out | 112 +++++++++++++++++++++ .../join/test_bitmap_filter_nereids.groovy | 95 +++++++++++++++++ 14 files changed, 524 insertions(+), 30 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5db3b71fd1..79eff09095 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -106,6 +106,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; +import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.ExpressionUtils; @@ -1182,9 +1183,22 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla joinFragment = createPlanFragment(nestedLoopJoinNode, DataPartition.UNPARTITIONED, nestedLoopJoin); context.addPlanFragment(joinFragment); + connectChildFragment(nestedLoopJoinNode, 0, joinFragment, leftFragment, context); } else { joinFragment = leftFragment; + nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot()); + joinFragment.setPlanRoot(nestedLoopJoinNode); } + // translate runtime filter + context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> { + List<RuntimeFilter> filters = runtimeFilterTranslator + .getRuntimeFilterOfHashJoinNode(nestedLoopJoin); + filters.forEach(filter -> runtimeFilterTranslator + .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, context)); + if (!filters.isEmpty()) { + nestedLoopJoinNode.setOutputLeftSideOnly(true); + } + }); Map<ExprId, SlotReference> leftChildOutputMap = Maps.newHashMap(); Stream.concat(nestedLoopJoin.child(0).getOutput().stream(), @@ -1277,15 +1291,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla nestedLoopJoinNode.setvIntermediateTupleDescList(Lists.newArrayList(intermediateDescriptor)); rightFragment.getPlanRoot().setCompactData(false); - if (needNewRootFragment) { - connectChildFragment(nestedLoopJoinNode, 0, joinFragment, leftFragment, context); - } else { - nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot()); - joinFragment.setPlanRoot(nestedLoopJoinNode); - } + connectChildFragment(nestedLoopJoinNode, 1, joinFragment, rightFragment, context); List<Expr> joinConjuncts = nestedLoopJoin.getOtherJoinConjuncts().stream() + .filter(e -> !nestedLoopJoin.isBitmapRuntimeFilterCondition(e)) .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList()); + + if (!nestedLoopJoin.isBitMapRuntimeFilterConditionsEmpty() && joinConjuncts.isEmpty()) { + //left semi join need at least one conjunct. otherwise left-semi-join fallback to cross-join + joinConjuncts.add(new BoolLiteral(true)); + } + nestedLoopJoinNode.setJoinConjuncts(joinConjuncts); nestedLoopJoin.getFilterConjuncts().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 78b2dbdf25..65714c027c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -22,21 +22,27 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleId; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.processor.post.RuntimeFilterContext; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.RelationId; -import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; +import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget; import org.apache.doris.planner.ScanNode; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.List; +import java.util.Map; /** * translate runtime filter @@ -50,7 +56,7 @@ public class RuntimeFilterTranslator { context.generatePhysicalHashJoinToRuntimeFilter(); } - public List<RuntimeFilter> getRuntimeFilterOfHashJoinNode(PhysicalHashJoin join) { + public List<RuntimeFilter> getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) { return context.getRuntimeFilterOnHashJoinNode(join); } @@ -68,38 +74,81 @@ public class RuntimeFilterTranslator { context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node); } + private class RuntimeFilterExpressionTranslator extends ExpressionTranslator { + Map<ExprId, SlotRef> nereidsExprIdToSlotRef; + + RuntimeFilterExpressionTranslator(Map<ExprId, SlotRef> nereidsExprIdToSlotRef) { + this.nereidsExprIdToSlotRef = nereidsExprIdToSlotRef; + } + + @Override + public Expr visitSlotReference(SlotReference slotReference, PlanTranslatorContext context) { + SlotRef slot = nereidsExprIdToSlotRef.get(slotReference.getExprId()); + if (slot == null) { + throw new AnalysisException("cannot find SlotRef for " + slotReference); + } + return slot; + } + } + /** * generate legacy runtime filter * @param filter nereids runtime filter * @param node hash join node * @param ctx plan translator context */ - public void createLegacyRuntimeFilter(RuntimeFilter filter, HashJoinNode node, PlanTranslatorContext ctx) { + public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, PlanTranslatorContext ctx) { Expr target = context.getExprIdToOlapScanNodeSlotRef().get(filter.getTargetExpr().getExprId()); if (target == null) { context.setTargetNullCount(); return; } + Expr targetExpr = null; + if (filter.getType() == TRuntimeFilterType.BITMAP) { + if (filter.getTargetExpression().equals(filter.getTargetExpr())) { + targetExpr = target; + } else { + RuntimeFilterExpressionTranslator translator = new RuntimeFilterExpressionTranslator( + context.getExprIdToOlapScanNodeSlotRef()); + try { + targetExpr = filter.getTargetExpression().accept(translator, ctx); + targetExpr.finalizeForNereids(); + } catch (org.apache.doris.common.AnalysisException e) { + throw new AnalysisException( + "Translate Nereids expression to stale expression failed. " + e.getMessage(), e); + } + + } + } else { + targetExpr = target; + } + Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx); SlotRef targetSlot = target.getSrcSlotRef(); TupleId targetTupleId = targetSlot.getDesc().getParent().getId(); SlotId targetSlotId = targetSlot.getSlotId(); // adjust data type - if (!src.getType().equals(target.getType())) { - target = new CastExpr(src.getType(), target); + if (!src.getType().equals(target.getType()) && filter.getType() != TRuntimeFilterType.BITMAP) { + targetExpr = new CastExpr(src.getType(), targetExpr); } org.apache.doris.planner.RuntimeFilter origFilter = org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter( - filter.getId(), node, src, filter.getExprOrder(), target, + filter.getId(), node, src, filter.getExprOrder(), targetExpr, ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)), filter.getType(), context.getLimits()); - origFilter.setIsBroadcast(node.getDistributionMode() == DistributionMode.BROADCAST); + if (node instanceof HashJoinNode) { + origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST); + } else { + //bitmap rf requires isBroadCast=false, it always requires merge filter + origFilter.setIsBroadcast(false); + } ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(filter.getTargetExpr()); origFilter.addTarget(new RuntimeFilterTarget( scanNode, - target, + targetExpr, true, scanNode.getFragmentId().equals(node.getFragmentId()))); + origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn()); context.getLegacyFilters().add(finalize(origFilter)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index e00775f520..7658b34908 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter; import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits; @@ -64,7 +65,7 @@ public class RuntimeFilterContext { // exprId to olap scan node slotRef because the slotRef will be changed when translating. private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = Maps.newHashMap(); - private final Map<PhysicalHashJoin, List<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap(); + private final Map<AbstractPhysicalJoin, List<RuntimeFilter>> runtimeFilterOnHashJoinNode = Maps.newHashMap(); // alias -> alias's child, if there's a key that is alias's child, the key-value will change by this way // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv will be C -> A. @@ -132,7 +133,7 @@ public class RuntimeFilterContext { return scanNodeOfLegacyRuntimeFilterTarget; } - public List<RuntimeFilter> getRuntimeFilterOnHashJoinNode(PhysicalHashJoin join) { + public List<RuntimeFilter> getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) { return runtimeFilterOnHashJoinNode.getOrDefault(join, Collections.emptyList()); } @@ -185,11 +186,11 @@ public class RuntimeFilterContext { return targetNullCount; } - public void addJoinToTargetMap(PhysicalHashJoin join, ExprId exprId) { + public void addJoinToTargetMap(AbstractPhysicalJoin join, ExprId exprId) { joinToTargetExprId.computeIfAbsent(join, k -> Lists.newArrayList()).add(exprId); } - public List<ExprId> getTargetExprIdByFilterJoin(PhysicalHashJoin join) { + public List<ExprId> getTargetExprIdByFilterJoin(AbstractPhysicalJoin join) { return joinToTargetExprId.get(join); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 2629d93c74..d787f628cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -24,11 +24,14 @@ import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; @@ -94,6 +97,10 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( (EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet())); for (TRuntimeFilterType type : legalTypes) { + //bitmap rf is generated by nested loop join. + if (type == TRuntimeFilterType.BITMAP) { + continue; + } // currently, we can ensure children in the two side are corresponding to the equal_to's. // so right maybe an expression and left is a slot or cast(slot) Slot unwrappedSlot = checkTargetChild(equalTo.left()); @@ -114,6 +121,57 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { return join; } + @Override + public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join, + CascadesContext context) { + if (join.getJoinType() != JoinType.LEFT_SEMI_JOIN && join.getJoinType() != JoinType.CROSS_JOIN) { + return join; + } + RuntimeFilterContext ctx = context.getRuntimeFilterContext(); + Map<NamedExpression, Pair<RelationId, Slot>> aliasTransferMap = ctx.getAliasTransferMap(); + join.right().accept(this, context); + join.left().accept(this, context); + + if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) == 0) { + //only generate BITMAP filter for nested loop join + return join; + } + List<Slot> leftSlots = join.left().getOutput(); + List<Slot> rightSlots = join.right().getOutput(); + List<Expression> bitmapRuntimeFilterConditions = JoinUtils.extractBitmapRuntimeFilterConditions(leftSlots, + rightSlots, join.getOtherJoinConjuncts()); + if (!JoinUtils.extractExpressionForHashTable(leftSlots, rightSlots, join.getOtherJoinConjuncts()) + .first.isEmpty()) { + return join; + } + int bitmapRFCount = bitmapRuntimeFilterConditions.size(); + for (int i = 0; i < bitmapRFCount; i++) { + Expression bitmapRuntimeFilterCondition = bitmapRuntimeFilterConditions.get(i); + boolean isNot = bitmapRuntimeFilterCondition instanceof Not; + BitmapContains bitmapContains = null; + if (bitmapRuntimeFilterCondition instanceof Not) { + bitmapContains = (BitmapContains) bitmapRuntimeFilterCondition.child(0); + } else { + bitmapContains = (BitmapContains) bitmapRuntimeFilterCondition; + } + TRuntimeFilterType type = TRuntimeFilterType.BITMAP; + Set<Slot> targetSlots = bitmapContains.child(1).getInputSlots(); + for (Slot targetSlot : targetSlots) { + if (targetSlot != null && aliasTransferMap.containsKey(targetSlot)) { + Slot olapScanSlot = aliasTransferMap.get(targetSlot).second; + RuntimeFilter filter = new RuntimeFilter(generator.getNextId(), + bitmapContains.child(0), olapScanSlot, + bitmapContains.child(1), type, i, join, isNot); + ctx.addJoinToTargetMap(join, olapScanSlot.getExprId()); + ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first, olapScanSlot); + join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition); + } + } + } + return join; + } + @Override public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext context) { project.child().accept(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java index d9110fb7d7..367b00530e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java @@ -20,9 +20,14 @@ package org.apache.doris.nereids.rules.exploration.join; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.exploration.OneExplorationRuleFactory; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TRuntimeFilterType; import java.util.List; @@ -46,6 +51,7 @@ public class JoinCommute extends OneExplorationRuleFactory { return logicalJoin() .when(join -> check(swapType, join)) .whenNot(LogicalJoin::hasJoinHint) + .whenNot(join -> joinOrderMatchBitmapRuntimeFilterOrder(join)) .whenNot(LogicalJoin::isMarkJoin) .then(join -> { LogicalJoin<GroupPlan, GroupPlan> newJoin = new LogicalJoin<>( @@ -93,4 +99,24 @@ public class JoinCommute extends OneExplorationRuleFactory { List<Slot> output = groupPlan.getOutput(); return !output.stream().map(Slot::getQualifier).allMatch(output.get(0).getQualifier()::equals); } + + /** + * bitmap runtime filter requires bitmap column on right. + */ + private boolean joinOrderMatchBitmapRuntimeFilterOrder(LogicalJoin<GroupPlan, GroupPlan> join) { + if (!ConnectContext.get().getSessionVariable().isRuntimeFilterTypeEnabled(TRuntimeFilterType.BITMAP)) { + return false; + } + for (Expression expr : join.getOtherJoinConjuncts()) { + if (expr instanceof Not) { + expr = expr.child(0); + } + if (expr instanceof BitmapContains) { + BitmapContains bitmapContains = (BitmapContains) expr; + return (join.right().getOutputSet().containsAll(bitmapContains.child(0).getInputSlots()) + && join.left().getOutputSet().containsAll(bitmapContains.child(1).getInputSlots())); + } + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java index d08125406a..ce95589a7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/InApplyToJoin.java @@ -21,17 +21,26 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.InSubquery; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Not; +import org.apache.doris.nereids.trees.expressions.functions.agg.BitmapUnion; +import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.JoinHint; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalApply; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; -import org.apache.doris.nereids.types.BitmapType; +import org.apache.doris.nereids.types.BigIntType; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeCoercionUtils; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.util.List; @@ -46,6 +55,50 @@ public class InApplyToJoin extends OneRewriteRuleFactory { @Override public Rule build() { return logicalApply().when(LogicalApply::isIn).then(apply -> { + if (needBitmapUnion(apply)) { + if (apply.isCorrelated()) { + throw new AnalysisException("In bitmap does not support correlated subquery"); + } + /* + case 1: in + select t1.k1 from bigtable t1 where t1.k1 in (select t2.k2 from bitmap_table t2); + => + select t1.k1 from bigtable t1 where t1.k1 in (select bitmap_union(k2) from bitmap_table t2); + => + select t1.k1 from bigtable t1 left semi join (select bitmap_union(k2) x from bitmap_table ) t2 + on bitmap_contains(x, t1.k1); + + case 2: not in + select t1.k1 from bigtable t1 where t1.k1 not in (select t2.k2 from bitmap_table t2); + => + select t1.k1 from bigtable t1 where t1.k1 not in (select bitmap_union(k2) from bitmap_table t2); + => + select t1.k1 from bigtable t1 left semi join (select bitmap_union(k2) x from bitmap_table ) t2 + on not bitmap_contains(x, t1.k1); + */ + List<Expression> groupExpressions = ImmutableList.of(); + Expression bitmapCol = apply.right().getOutput().get(0); + BitmapUnion union = new BitmapUnion(bitmapCol); + Alias alias = new Alias(union, union.toSql()); + List<NamedExpression> outputExpressions = Lists.newArrayList(alias); + + LogicalAggregate agg = new LogicalAggregate(groupExpressions, outputExpressions, apply.right()); + Expression compareExpr = ((InSubquery) apply.getSubqueryExpr()).getCompareExpr(); + if (!compareExpr.getDataType().isBigIntType()) { + //this rule is after type coercion, we need to add cast by hand + compareExpr = new Cast(compareExpr, BigIntType.INSTANCE); + } + Expression expr = new BitmapContains(agg.getOutput().get(0), compareExpr); + if (((InSubquery) apply.getSubqueryExpr()).isNot()) { + expr = new Not(expr); + } + return new LogicalJoin<>(JoinType.LEFT_SEMI_JOIN, Lists.newArrayList(), + Lists.newArrayList(expr), + JoinHint.NONE, + apply.left(), agg); + } + + //in-predicate to equal Expression predicate; Expression left = ((InSubquery) apply.getSubqueryExpr()).getCompareExpr(); Expression right = apply.right().getOutput().get(0); @@ -61,13 +114,7 @@ public class InApplyToJoin extends OneRewriteRuleFactory { if (apply.getSubCorrespondingConject().isPresent()) { predicate = ExpressionUtils.and(predicate, apply.getSubCorrespondingConject().get()); } - - //TODO nereids should support bitmap runtime filter in future List<Expression> conjuncts = ExpressionUtils.extractConjunction(predicate); - if (conjuncts.stream().anyMatch(expression -> expression.children().stream() - .anyMatch(expr -> expr.getDataType() == BitmapType.INSTANCE))) { - throw new AnalysisException("nereids don't support bitmap runtime filter"); - } if (((InSubquery) apply.getSubqueryExpr()).isNot()) { return new LogicalJoin<>( predicate.nullable() ? JoinType.NULL_AWARE_LEFT_ANTI_JOIN : JoinType.LEFT_ANTI_JOIN, @@ -83,4 +130,9 @@ public class InApplyToJoin extends OneRewriteRuleFactory { } }).toRule(RuleType.IN_APPLY_TO_JOIN); } + + private boolean needBitmapUnion(LogicalApply<Plan, Plan> apply) { + return apply.right().getOutput().get(0).getDataType().isBitmapType() + && !((InSubquery) apply.getSubqueryExpr()).getCompareExpr().getDataType().isBitmapType(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java index b0dd2827bf..20a80ba614 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFilter.java @@ -149,5 +149,4 @@ public class LogicalFilter<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_T public boolean isSingleTableExpressionExtracted() { return singleTableExpressionExtracted; } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java index 808744c05a..131a6b40df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalNestedLoopJoin.java @@ -31,9 +31,11 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.StatsDeriveResult; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.util.List; import java.util.Optional; +import java.util.Set; /** * Use nested loop algorithm to do join. @@ -43,6 +45,13 @@ public class PhysicalNestedLoopJoin< RIGHT_CHILD_TYPE extends Plan> extends AbstractPhysicalJoin<LEFT_CHILD_TYPE, RIGHT_CHILD_TYPE> { + /* + bitmap_contains(...) or Not(bitmap_contains(...)) can be used as bitmap runtime filter condition + bitmapRF is different from other RF in that scan node must wait for it. + if a condition is used in rf, it can be removed from join conditions. we collect these conditions here. + */ + private final Set<Expression> bitMapRuntimeFilterConditions = Sets.newHashSet(); + public PhysicalNestedLoopJoin( JoinType joinType, List<Expression> hashJoinConjuncts, @@ -145,4 +154,16 @@ public class PhysicalNestedLoopJoin< hashJoinConjuncts, otherJoinConjuncts, markJoinSlotReference, Optional.empty(), getLogicalProperties(), physicalProperties, statsDeriveResult, left(), right()); } + + public void addBitmapRuntimeFilterCondition(Expression expr) { + bitMapRuntimeFilterConditions.add(expr); + } + + public boolean isBitmapRuntimeFilterCondition(Expression expr) { + return bitMapRuntimeFilterConditions.contains(expr); + } + + public boolean isBitMapRuntimeFilterConditionsEmpty() { + return bitMapRuntimeFilterConditions.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index 8d97065932..239b9d6737 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -30,21 +30,37 @@ public class RuntimeFilter { private final RuntimeFilterId id; private final TRuntimeFilterType type; private final Expression srcSlot; + //bitmap filter support target expression like k1+1, abs(k1) + //targetExpression is an expression on targetSlot, in which there is only one non-const slot + private Expression targetExpression; private Slot targetSlot; private final int exprOrder; - private PhysicalHashJoin builderNode; + private AbstractPhysicalJoin builderNode; + + private boolean bitmapFilterNotIn; /** * constructor */ public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, TRuntimeFilterType type, - int exprOrder, PhysicalHashJoin builderNode) { + int exprOrder, AbstractPhysicalJoin builderNode) { + this(id, src, target, target, type, exprOrder, builderNode, false); + } + + /** + * constructor + */ + public RuntimeFilter(RuntimeFilterId id, Expression src, Slot target, Expression targetExpression, + TRuntimeFilterType type, + int exprOrder, AbstractPhysicalJoin builderNode, boolean bitmapFilterNotIn) { this.id = id; this.srcSlot = src; this.targetSlot = target; + this.targetExpression = targetExpression; this.type = type; this.exprOrder = exprOrder; this.builderNode = builderNode; + this.bitmapFilterNotIn = bitmapFilterNotIn; } public Expression getSrcExpr() { @@ -67,7 +83,16 @@ public class RuntimeFilter { return exprOrder; } - public PhysicalHashJoin getBuilderNode() { + public AbstractPhysicalJoin getBuilderNode() { return builderNode; } + + public boolean isBitmapFilterNotIn() { + return bitmapFilterNotIn; + } + + public Expression getTargetExpression() { + return targetExpression; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index 7600656c64..662c7838e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -27,7 +27,9 @@ import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Join; @@ -136,6 +138,39 @@ public class JoinUtils { ); } + /** + * This is used for bitmap runtime filter only. + * Extract bitmap_contains conjunct: + * like: bitmap_contains(a, b) and ..., Not(bitmap_contains(a, b)) and ..., + * where `a` and `b` are from right child and left child, respectively. + * + * @return condition for bitmap runtime filter: bitmap_contains + */ + public static List<Expression> extractBitmapRuntimeFilterConditions(List<Slot> leftSlots, + List<Slot> rightSlots, List<Expression> onConditions) { + List<Expression> result = Lists.newArrayList(); + for (Expression expr : onConditions) { + BitmapContains bitmapContains = null; + if (expr instanceof Not) { + List<Expression> notChildren = ExpressionUtils.extractConjunction(expr.child(0)); + if (notChildren.size() == 1 && notChildren.get(0) instanceof BitmapContains) { + bitmapContains = (BitmapContains) notChildren.get(0); + } + } else if (expr instanceof BitmapContains) { + bitmapContains = (BitmapContains) expr; + } + if (bitmapContains == null) { + continue; + } + //first child in right, second child in left + if (leftSlots.containsAll(bitmapContains.child(1).collect(Slot.class::isInstance)) + && rightSlots.containsAll(bitmapContains.child(0).collect(Slot.class::isInstance))) { + result.add(expr); + } + } + return result; + } + /** * Get all used slots from onClause of join. * Return pair of left used slots and right used slots. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index ad92f2245f..58d0086ed6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -148,7 +148,7 @@ public final class RuntimeFilter { } // only for nereids planner - public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, HashJoinNode node, Expr srcExpr, + public static RuntimeFilter fromNereidsRuntimeFilter(RuntimeFilterId id, JoinNodeBase node, Expr srcExpr, int exprOrder, Expr origTargetExpr, Map<TupleId, List<SlotId>> targetSlots, TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) { return new RuntimeFilter(id, node, srcExpr, exprOrder, origTargetExpr, targetSlots, type, filterSizeLimits); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2ece4ede37..4d7cb98994 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.metrics.EventSwitchParser; import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; +import org.apache.doris.thrift.TRuntimeFilterType; import com.google.common.base.Joiner; import com.google.common.base.Strings; @@ -1278,6 +1279,10 @@ public class SessionVariable implements Serializable, Writable { return runtimeFilterType; } + public boolean isRuntimeFilterTypeEnabled(TRuntimeFilterType type) { + return (runtimeFilterType & type.getValue()) == type.getValue(); + } + public void setRuntimeFilterType(int runtimeFilterType) { this.runtimeFilterType = runtimeFilterType; } diff --git a/regression-test/data/query_p0/join/test_bitmap_filter_nereids.out b/regression-test/data/query_p0/join/test_bitmap_filter_nereids.out new file mode 100644 index 0000000000..d71b056b27 --- /dev/null +++ b/regression-test/data/query_p0/join/test_bitmap_filter_nereids.out @@ -0,0 +1,112 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 1989 +3 1989 +5 1985 +7 -32767 +9 1991 +10 1991 +11 1989 +12 32767 +13 -32767 +14 255 + +-- !sql2 -- +2 1986 +4 1991 +6 32767 +8 255 +9 1991 +10 1991 +11 1989 +12 32767 +13 -32767 + +-- !sql3 -- +2 1986 +4 1991 +6 32767 +8 255 +10 1991 +12 32767 +14 255 +15 1992 + +-- !sql4 -- +1 1989 +3 1989 +5 1985 +7 -32767 +9 1991 +11 1989 +13 -32767 + +-- !sql5 -- +1 1989 +3 1989 +7 -32767 +11 1989 +13 -32767 + +-- !sql6 -- +-32767 2 +255 1 +1985 1 +1989 3 +1991 2 +32767 1 + +-- !sql7 -- + +-- !sql8 -- +11 11 + +-- !sql9 -- +2 11 + +-- !sql10 -- + +-- !sql11 -- +1991-08-11 +1991-08-11 +2012-03-14 +2015-04-02 +2015-04-02 +2015-04-02 +2015-04-02 + +-- !sql12 -- +1 +3 +5 +7 +9 +10 +11 +12 +13 +14 +255 +1985 +1991 +32767 + +-- !sql13 -- +10 1991 + +-- !sql14 -- +1 1989 +10 1991 + +-- !sql15 -- +1 1 +3 1 +5 1 +7 1 +9 1 +10 1 +11 1 +12 1 +13 1 +14 1 + diff --git a/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy b/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy new file mode 100644 index 0000000000..82528a44b0 --- /dev/null +++ b/regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_bitmap_filter_nereids") { + def tbl1 = "test_query_db.bigtable" + def tbl2 = "bitmap_table_nereids" + def tbl3 = "test_query_db.baseall" + + sql "set runtime_filter_type = 16" + + sql "DROP TABLE IF EXISTS ${tbl2}" + sql """ + CREATE TABLE ${tbl2} ( + `k1` int(11) NULL, + `k2` bitmap BITMAP_UNION NULL, + `k3` bitmap BITMAP_UNION NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + insert into ${tbl2} values + (1, bitmap_from_string('1, 3, 5, 7, 9, 11, 13, 99, 19910811, 20150402'), + bitmap_from_string('32767, 1985, 255, 789, 1991')), + (2, bitmap_from_string('10, 11, 12, 13, 14'), bitmap_empty());""" + + sql "set enable_nereids_planner=true;" + sql "set enable_fallback_to_original_planner=false;" + + qt_sql1 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) order by k1;" + + qt_sql2 "select k1, k2 from ${tbl1} where k1 + 1 in (select k2 from ${tbl2}) order by k1;" + + qt_sql3 "select k1, k2 from ${tbl1} where k1 not in (select k2 from ${tbl2} where k1 = 1) order by k1;" + + qt_sql4 "select t1.k1, t1.k2 from ${tbl1} t1 join ${tbl3} t3 on t1.k1 = t3.k1 where t1.k1 in (select k2 from ${tbl2} where k1 = 1) order by t1.k1;" + + qt_sql5 "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) and k2 not in (select k3 from ${tbl2}) order by k1;" + + qt_sql6 "select k2, count(k2) from ${tbl1} where k1 in (select k2 from ${tbl2}) group by k2 order by k2;" + + qt_sql7 "select k1, k2 from (select 2 k1, 2 k2) t where k1 in (select k2 from ${tbl2}) order by 1, 2;" + + qt_sql8 "select k1, k2 from (select 11 k1, 11 k2) t where k1 in (select k2 from ${tbl2}) order by 1, 2;" + + qt_sql9 "select k1, k2 from (select 2 k1, 11 k2) t where k1 not in (select k2 from ${tbl2}) order by 1, 2;" + + qt_sql10 "select k1, k2 from (select 1 k1, 11 k2) t where k1 not in (select k2 from ${tbl2}) order by 1, 2;" + + qt_sql11 "select k10 from ${tbl1} where cast(k10 as bigint) in (select bitmap_or(k2, to_bitmap(20120314)) from ${tbl2} b) order by 1;" + + qt_sql12 """ + with w1 as (select k1 from ${tbl1} where k1 in (select k2 from ${tbl2})), w2 as (select k2 from ${tbl1} where k2 in (select k3 from ${tbl2})) + select * from (select * from w1 union select * from w2) tmp order by 1; + """ + + qt_sql13 "select k1, k2 from ${tbl1} where k1 in (select to_bitmap(10)) order by 1, 2" + + qt_sql14 "select k1, k2 from ${tbl1} where k1 in (select bitmap_from_string('1,10')) order by 1, 2" + + qt_sql15 "select k1, count(*) from ${tbl1} b1 group by k1 having k1 in (select k2 from ${tbl2} b2) order by k1;" + + explain{ + sql "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) order by k1;" + contains "RF000[bitmap]" + } + + explain{ + sql "select k1, k2 from ${tbl1} where k1 not in (select k2 from ${tbl2} where k1 = 1)" + contains "RF000[bitmap]" + } + + explain{ + sql " select k1, k2 from (select 2 k1, 2 k2) t where k1 in (select k2 from ${tbl2})" + notContains "RF000[bitmap]" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org