xzj7019 commented on code in PR #21114:
URL: https://github.com/apache/doris/pull/21114#discussion_r1243898489


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation 
visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, 
right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) 
buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = 
ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), 
join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding 
to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the 
olap scan to the join
+        // contains join with denied join type. for example: a left join b on 
a.id = b.id
+        if (unwrappedSlot == null || 
!aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = 
context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = 
context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) 
cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, 
exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), 
olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? 
extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < 
project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) 
project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) 
project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                
ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), 
olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, 
buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), 
filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? 
extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> 
cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), 
cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? 
extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = 
ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : 
cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = 
context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column 
of the same cte
+            // the other side of these join conditions are the same column of 
the same table, or
+            // they in the same equal sets, such as under an equal join 
condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = 
cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = 
cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 
producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new 
LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof 
EqualTo)
+                        || !(((EqualTo) 
join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) 
join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) 
join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && 
left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && 
right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the 
same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == 
equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = 
context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, 
equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring 
equal set, which
+                        // indicating actually the same runtime filter build 
side
+                        List<Expression> conditions = 
curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) 
((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) 
((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && 
anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, 
equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = 
entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : 
equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);
+                TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
+                if (ctx.getSessionVariable().enablePipelineEngine()) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                EqualTo newEqualTo = ((EqualTo) 
JoinUtils.swapEqualToForChildrenOrder(
+                        equalTo, join.child(0).getOutputSet()));
+                doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, 
exprOrder++, cteProducer);
+            }
+            ctx.getPushedDownCTE().add(cteProducer.getCteId());
+        }
+    }
+
+    private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends 
Plan, ? extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder,
+            PhysicalCTEProducer cteProducer) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
+
+        Preconditions.checkState(inputPlanNode != null);

Review Comment:
   done



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -294,8 +221,466 @@ public PhysicalRelation 
visitPhysicalScan(PhysicalRelation scan, CascadesContext
         return scan;
     }
 
+    private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, EqualTo equalTo) {
+        AbstractPlan right = (AbstractPlan) join.right();
+        //make ut test friendly
+        if (right.getStats() == null) {
+            return -1L;
+        }
+        ExpressionEstimation estimator = new ExpressionEstimation();
+        ColumnStatistic buildColStats = equalTo.right().accept(estimator, 
right.getStats());
+        return buildColStats.isUnKnown ? -1 : Math.max(1, (long) 
buildColStats.ndv);
+    }
+
     private static Slot checkTargetChild(Expression leftChild) {
         Expression expression = 
ExpressionUtils.getExpressionCoveredByCast(leftChild);
         return expression instanceof Slot ? ((Slot) expression) : null;
     }
+
+    private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values())
+                .filter(type -> (type.getValue() & 
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+                .collect(Collectors.toList());
+        // TODO: some complex situation cannot be handled now, see 
testPushDownThroughJoin.
+        //   we will support it in later version.
+        for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
+            EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
+                    (EqualTo) join.getHashJoinConjuncts().get(i), 
join.left().getOutputSet()));
+            for (TRuntimeFilterType type : legalTypes) {
+                //bitmap rf is generated by nested loop join.
+                if (type == TRuntimeFilterType.BITMAP) {
+                    continue;
+                }
+                if (join.left() instanceof PhysicalUnion
+                        || join.left() instanceof PhysicalIntersect
+                        || join.left() instanceof PhysicalExcept) {
+                    doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
+                } else {
+                    doPushDownBasic(join, context, ctx, equalTo, type, i);
+                }
+            }
+        }
+    }
+
+    private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, CascadesContext context,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        // currently, we can ensure children in the two side are corresponding 
to the equal_to's.
+        // so right maybe an expression and left is a slot
+        Slot unwrappedSlot = checkTargetChild(equalTo.left());
+        // aliasTransMap doesn't contain the key, means that the path from the 
olap scan to the join
+        // contains join with denied join type. for example: a left join b on 
a.id = b.id
+        if (unwrappedSlot == null || 
!aliasTransferMap.containsKey(unwrappedSlot)) {
+            return;
+        }
+        Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
+        PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
+
+        Preconditions.checkState(olapScanSlot != null && scan != null);
+
+        if (scan instanceof PhysicalCTEConsumer) {
+            Set<CTEId> processedCTE = 
context.getRuntimeFilterContext().getProcessedCTE();
+            CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
+            if (!processedCTE.contains(cteId)) {
+                PhysicalCTEProducer cteProducer = 
context.getRuntimeFilterContext()
+                        .getCteProduceMap().get(cteId);
+                PhysicalPlan inputPlanNode = (PhysicalPlan) 
cteProducer.child(0);
+                // process cte producer self recursively
+                inputPlanNode.accept(this, context);
+                processedCTE.add(cteId);
+            }
+        } else {
+            // in-filter is not friendly to pipeline
+            if (type == TRuntimeFilterType.IN_OR_BLOOM
+                    && ctx.getSessionVariable().enablePipelineEngine()
+                    && hasRemoteTarget(join, scan)) {
+                type = TRuntimeFilterType.BLOOM;
+            }
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), ImmutableList.of(olapScanSlot), type, 
exprOrder, join, buildSideNdv);
+            ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+            ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+            
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), 
olapScanSlot);
+        }
+    }
+
+    private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? 
extends Plan> join,
+            RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType 
type, int exprOrder) {
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        List<Slot> targetList = new ArrayList<>();
+        int projIndex = -1;
+        for (int j = 0; j < join.left().children().size(); j++) {
+            PhysicalPlan child = (PhysicalPlan) join.left().child(j);
+            if (child instanceof PhysicalProject) {
+                PhysicalProject project = (PhysicalProject) child;
+                Slot leftSlot = checkTargetChild(equalTo.left());
+                if (leftSlot == null) {
+                    break;
+                }
+                for (int k = 0; projIndex < 0 && k < 
project.getProjects().size(); k++) {
+                    NamedExpression expr = (NamedExpression) 
project.getProjects().get(k);
+                    if (expr.getName().equals(leftSlot.getName())) {
+                        projIndex = k;
+                        break;
+                    }
+                }
+                Preconditions.checkState(projIndex >= 0
+                        && projIndex < project.getProjects().size());
+
+                NamedExpression targetExpr = (NamedExpression) 
project.getProjects().get(projIndex);
+
+                SlotReference origSlot = null;
+                if (targetExpr instanceof Alias) {
+                    origSlot = (SlotReference) targetExpr.child(0);
+                } else {
+                    origSlot = (SlotReference) targetExpr;
+                }
+                Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+                PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
+                if (type == TRuntimeFilterType.IN_OR_BLOOM
+                        && ctx.getSessionVariable().enablePipelineEngine()
+                        && hasRemoteTarget(join, scan)) {
+                    type = TRuntimeFilterType.BLOOM;
+                }
+                targetList.add(olapScanSlot);
+                ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+                
ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getId(), 
olapScanSlot);
+            }
+        }
+        if (!targetList.isEmpty()) {
+            long buildSideNdv = getBuildSideNdv(join, equalTo);
+            RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+                    equalTo.right(), targetList, type, exprOrder, join, 
buildSideNdv);
+            for (int j = 0; j < targetList.size(); j++) {
+                ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), 
filter);
+            }
+        }
+    }
+
+    private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? 
extends Plan> join,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Set<CTEId> cteIds = new HashSet<>();
+        PhysicalPlan leftChild = (PhysicalPlan) join.left();
+        PhysicalPlan rightChild = (PhysicalPlan) join.right();
+
+        Preconditions.checkState(leftChild != null && rightChild != null);
+
+        boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
+        boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
+        // only one side cte is allowed currently
+        if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
+            for (CTEId id : cteIds) {
+                if (ctx.getCteToJoinsMap().get(id) == null) {
+                    Set<PhysicalHashJoin> newJoin = new HashSet<>();
+                    newJoin.add(join);
+                    ctx.getCteToJoinsMap().put(id, newJoin);
+                } else {
+                    ctx.getCteToJoinsMap().get(id).add(join);
+                }
+            }
+        }
+        if (!ctx.getCteToJoinsMap().isEmpty()) {
+            analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
+        }
+    }
+
+    private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
+        List<CTEId> candidates = new ArrayList<>();
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
entry : cteRFPushDownMap.entrySet()) {
+            CTEId cteId = entry.getKey().getCteId();
+            if (ctx.getPushedDownCTE().contains(cteId)) {
+                continue;
+            }
+            candidates.add(cteId);
+        }
+        return candidates;
+    }
+
+    private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> 
cteIds) {
+        if (root instanceof PhysicalCTEConsumer) {
+            cteIds.add(((PhysicalCTEConsumer) root).getCteId());
+            return true;
+        } else if (root.children().size() != 1) {
+            // only collect cte in one side
+            return false;
+        } else if (root instanceof PhysicalDistribute
+                || root instanceof PhysicalFilter
+                || root instanceof PhysicalProject) {
+            return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), 
cteIds);
+        } else {
+            return false;
+        }
+    }
+
+    private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? 
extends Plan, ? extends Plan> curJoin,
+            CascadesContext context) {
+        RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+        Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = 
ctx.getCteToJoinsMap();
+        for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : 
cteToJoinsMap.entrySet()) {
+            CTEId cteId = entry.getKey();
+            Set<PhysicalHashJoin> joinSet = entry.getValue();
+            if (joinSet.contains(curJoin)) {
+                // skip current join
+                continue;
+            }
+            Set<LogicalCTEConsumer> cteSet = 
context.getCteIdToConsumers().get(cteId);
+            Preconditions.checkState(!cteSet.isEmpty());
+            String cteName = cteSet.iterator().next().getName();
+            // preconditions for rf pushing into cte producer:
+            // multiple joins whose join condition is on the same cte's column 
of the same cte
+            // the other side of these join conditions are the same column of 
the same table, or
+            // they in the same equal sets, such as under an equal join 
condition
+            // case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = 
cte1_consumer2.c1 conditions
+            //         rf of t1.c1 can be pushed down into cte1 producer
+            // case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = 
cte2_consumer2.c1 conditions
+            //         and another equal join condition with t1.c1 = t2.c2
+            //         rf of t1.c1 and t2.c2 can be pushed down into cte2 
producer
+            if (joinSet.size() != cteSet.size()) {
+                continue;
+            }
+            List<EqualTo> equalTos = new ArrayList<>();
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new 
LinkedHashMap<>();
+            for (PhysicalHashJoin join : joinSet) {
+                // precondition:
+                // 1. no non-equal join condition
+                // 2. only equalTo and slotReference both sides
+                // 3. only support one join condition (will be refined further)
+                if (join.getOtherJoinConjuncts().size() > 1
+                        || join.getHashJoinConjuncts().size() != 1
+                        || !(join.getHashJoinConjuncts().get(0) instanceof 
EqualTo)
+                        || !(((EqualTo) 
join.getHashJoinConjuncts().get(0)).child(0) instanceof SlotReference)
+                        || !(((EqualTo) 
join.getHashJoinConjuncts().get(0)).child(1) instanceof SlotReference)) {
+                    break;
+                } else {
+                    EqualTo equalTo = (EqualTo) 
join.getHashJoinConjuncts().get(0);
+                    equalTos.add(equalTo);
+                    equalCondToJoinMap.put(equalTo, join);
+                }
+            }
+            if (joinSet.size() == equalTos.size()) {
+                int matchNum = 0;
+                Set<String> cteNameSet = new HashSet<>();
+                Set<SlotReference> anotherSideSlotSet = new HashSet<>();
+                for (EqualTo equalTo : equalTos) {
+                    // has been checked above, SlotReference for both sides
+                    SlotReference left = (SlotReference) equalTo.left();
+                    SlotReference right = (SlotReference) equalTo.right();
+                    if (left.getQualifier().size() == 1 && 
left.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(right);
+                        cteNameSet.add(left.getQualifiedName());
+                    } else if (right.getQualifier().size() == 1 && 
right.getQualifier().get(0).equals(cteName)) {
+                        matchNum += 1;
+                        anotherSideSlotSet.add(left);
+                        cteNameSet.add(right.getQualifiedName());
+                    }
+                }
+                if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
+                    // means all join condition points to the same cte on the 
same cte column.
+                    // collect the other side columns besides cte column side.
+                    Preconditions.checkState(equalTos.size() == 
equalCondToJoinMap.size());
+
+                    PhysicalCTEProducer cteProducer = 
context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
+                    if (anotherSideSlotSet.size() == 1) {
+                        // meet requirement for pushing down into cte producer
+                        ctx.getCteRFPushDownMap().put(cteProducer, 
equalCondToJoinMap);
+                    } else {
+                        // check further whether the join upper side can bring 
equal set, which
+                        // indicating actually the same runtime filter build 
side
+                        List<Expression> conditions = 
curJoin.getHashJoinConjuncts();
+                        boolean inSameEqualSet = false;
+                        for (Expression e : conditions) {
+                            if (e instanceof EqualTo) {
+                                SlotReference oneSide = (SlotReference) 
((EqualTo) e).left();
+                                SlotReference anotherSide = (SlotReference) 
((EqualTo) e).right();
+                                if (anotherSideSlotSet.contains(oneSide) && 
anotherSideSlotSet.contains(anotherSide)) {
+                                    inSameEqualSet = true;
+                                    break;
+                                }
+                            }
+                        }
+                        if (inSameEqualSet) {
+                            ctx.getCteRFPushDownMap().put(cteProducer, 
equalCondToJoinMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
+        Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
cteRFPushDownMap = ctx.getCteRFPushDownMap();
+        for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> 
entry : cteRFPushDownMap.entrySet()) {
+            PhysicalCTEProducer cteProducer = entry.getKey();
+            if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
+                continue;
+            }
+            Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = 
entry.getValue();
+            int exprOrder = 0;
+            for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : 
equalCondToJoinMap.entrySet()) {
+                EqualTo equalTo = innerEntry.getKey();
+                PhysicalHashJoin join = innerEntry.getValue();
+                Preconditions.checkState(cteProducer != null && join != null);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to