xzj7019 commented on code in PR #21114:
URL: https://github.com/apache/doris/pull/21114#discussion_r1243939098


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation 
visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, 
right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) 
buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = 
ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), 
join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding 
to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the 
olap scan to the join
+        // contains join with denied join type. for example: a left join b on 
a.id = b.id
+        if (unwrappedSlot == null || 
!aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = 
context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = 
context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) 
cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, 
exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), 
olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? 
extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < 
project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) 
project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) 
project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                
ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), 
olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, 
buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), 
filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? 
extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> 
cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to