morrySnow commented on code in PR #12182:
URL: https://github.com/apache/doris/pull/12182#discussion_r965475500


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.rules.analysis.OlapScanNodeId;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {
+
+    private final IdGenerator<RuntimeFilterId> generator = 
RuntimeFilterId.createGenerator();
+
+    // exprId of target to runtime filter.
+    private final Map<ExprId, List<RuntimeFilter>> targetExprIdToFilter = 
Maps.newHashMap();
+
+    // olap scan node that contains target of a runtime filter.
+    private final Map<OlapScanNodeId, List<SlotReference>> 
targetOnOlapScanNodeMap = Maps.newHashMap();
+
+    private final List<org.apache.doris.planner.RuntimeFilter> legacyFilters = 
Lists.newArrayList();
+
+    // exprId to olap scan node slotRef because the slotRef will be changed 
when translating.
+    private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
+
+    private final Map<ExprId, Slot> hashJoinExprToOlapScanSlot = 
Maps.newHashMap();
+
+    // Alias's child to itself.
+    private final Map<Slot, NamedExpression> aliasChildToSelf = 
Maps.newHashMap();
+
+    private final Map<Slot, OlapScanNode> scanNodeOfLegacyRuntimeFilterTarget 
= Maps.newHashMap();
+
+    private final SessionVariable sessionVariable;
+
+    private final FilterSizeLimits limits;
+
+    private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+            JoinType.LEFT_ANTI_JOIN,
+            JoinType.RIGHT_ANTI_JOIN,
+            JoinType.FULL_OUTER_JOIN,
+            JoinType.LEFT_OUTER_JOIN
+    );
+
+    /**
+     * the runtime filter generator run at the phase of post process and plan 
translation of nereids planner.
+     * post process:
+     * first step: if encounter supported join type, generate nereids runtime 
filter for all the hash conjunctions
+     * and make association from exprId of the target slot references to the 
runtime filter. or delete the runtime
+     * filter whose target slot reference is one of the output slot references 
of the left child of the physical join as
+     * the runtime filter.
+     * second step: if encounter project, collect the association of its child 
and it for pushing down through
+     * the project node.
+     * plan translation:
+     * third step: generate nereids runtime filter target at olap scan node 
fragment.
+     * forth step: generate legacy runtime filter target and runtime filter at 
hash join node fragment.
+     */
+
+    public RuntimeFilterGenerator(SessionVariable sessionVariable) {
+        this.sessionVariable = sessionVariable;
+        this.limits = new FilterSizeLimits(sessionVariable);
+    }
+
+    // TODO: current support inner join, cross join, right outer join, and 
will support more join type.
+    @Override
+    public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
+            CascadesContext ctx) {
+        if (deniedJoinType.contains(join.getJoinType())) {
+            /* TODO: translate left outer join to inner join if there are 
inner join ancestors
+             * if it has encountered inner join, like
+             *                       a=b
+             *                      /   \
+             *                     /     \
+             *                    /       \
+             *                   /         \
+             *      left join-->a=c         b
+             *                  / \
+             *                 /   \
+             *                /     \
+             *               /       \
+             *              a         c
+             * runtime filter whose src expr is b can take effect on c.
+             * but now checking the inner join is unsupported. we may support 
it at later version.
+             */
+            join.getOutput().forEach(slot -> 
targetExprIdToFilter.remove(slot.getExprId()));
+        } else {
+            List<TRuntimeFilterType> legalTypes = 
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
+                    (type.getValue() & sessionVariable.getRuntimeFilterType()) 
> 0).collect(Collectors.toList());
+            AtomicInteger cnt = new AtomicInteger();
+            join.getHashJoinConjuncts().stream()
+                    .map(EqualTo.class::cast)
+                    .peek(expr -> {
+                        // target is always the expr at the two side of equal 
of hash conjunctions.
+                        List<SlotReference> slots = 
expr.children().stream().filter(SlotReference.class::isInstance)
+                                
.map(SlotReference.class::cast).collect(Collectors.toList());
+                        if (slots.size() != 2 || 
!(targetExprIdToFilter.containsKey(slots.get(0).getExprId())
+                                || 
targetExprIdToFilter.containsKey(slots.get(1).getExprId()))) {
+                            return;
+                        }
+                        int tag = 
targetExprIdToFilter.containsKey(slots.get(0).getExprId()) ? 0 : 1;
+                        targetExprIdToFilter.computeIfAbsent(slots.get(tag ^ 
1).getExprId(),
+                                k -> 
targetExprIdToFilter.get(slots.get(tag).getExprId()).stream()
+                                        .map(filter -> new 
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
+                                        slots.get(tag ^ 1), filter.getType(), 
filter.getExprOrder()))
+                                        .collect(Collectors.toList()));
+                    })
+                    .forEach(expr -> legalTypes.stream()
+                            .map(type -> 
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
+                                    type, cnt.getAndIncrement(), join))
+                            .filter(Objects::nonNull)
+                            .forEach(filter -> 
targetExprIdToFilter.computeIfAbsent(
+                                    filter.getTargetExpr().getExprId(), k -> 
new ArrayList<>()).add(filter)));
+        }
+        join.left().accept(this, ctx);
+        join.right().accept(this, ctx);
+        return join;
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext ctx) {
+        project.getProjects().stream().filter(Alias.class::isInstance)
+                .map(Alias.class::cast)
+                .filter(expr -> expr.child() instanceof SlotReference)
+                .forEach(expr -> aliasChildToSelf.put(((SlotReference) 
expr.child()), expr));
+        project.child().accept(this, ctx);
+        return project;
+    }
+
+    @Override
+    public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, 
CascadesContext ctx) {
+        scan.getOutput().stream()
+                .filter(slot -> 
getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
+                        .filter(expr -> 
targetExprIdToFilter.containsKey(expr.getExprId()))
+                        .peek(expr -> {
+                            List<RuntimeFilter> filters = 
targetExprIdToFilter.get(expr.getExprId());
+                            targetExprIdToFilter.remove(expr.getExprId());
+                            filters.forEach(filter -> 
filter.setTargetSlot(slot));
+                            targetExprIdToFilter.put(slot.getExprId(), 
filters);
+                            hashJoinExprToOlapScanSlot.put(expr.getExprId(), 
slot);
+                        })
+                        .count() > 0)
+                .forEach(slot -> 
targetOnOlapScanNodeMap.computeIfAbsent(scan.id, k -> new ArrayList<>())
+                        .add((SlotReference) slot));
+        return scan;
+    }
+
+    /**
+     * Translate nereids runtime filter on hash join node
+     * translate the runtime filter whose target expression id is one of the 
output slot reference of the left child of
+     * the physical hash join.
+     * @param node hash join node
+     * @param ctx plan translator context
+     */
+    public void translateRuntimeFilter(Slot slot, HashJoinNode node, 
PlanTranslatorContext ctx) {
+        ExprId id = slot.getExprId();
+        legacyFilters.addAll(targetExprIdToFilter.get(id).stream()
+                .filter(RuntimeFilter::isUninitialized)
+                .map(filter -> createLegacyRuntimeFilter(filter, node, ctx))
+                .collect(Collectors.toList()));
+    }
+
+    public List<SlotReference> getTargetOnOlapScanNode(OlapScanNodeId id) {

Review Comment:
   ```suggestion
       public List<SlotReference> getTargetOnScanNode(OlapScanNodeId id) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java:
##########
@@ -43,8 +44,16 @@ public PhysicalPlan process(PhysicalPlan physicalPlan) {
         return resultPlan;
     }
 
+    /**
+     * generate processors
+     * @return processors

Review Comment:
   ```suggestion
        * generate processors
        *
        * @return processors
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java:
##########
@@ -44,17 +44,19 @@
 public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFactory {
     @Override
     public Rule build() {
-        return logicalOlapScan().then(olapScan ->
-            new PhysicalOlapScan(
+        return logicalOlapScan().thenApply(ctx -> {
+            LogicalOlapScan olapScan = ctx.root;
+            return new PhysicalOlapScan(
+                    ctx.statementContext.generator.getNextId(),

Review Comment:
   we need to generate the id when bind relation. and let logical and physical 
relation has the same id



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java:
##########
@@ -63,8 +62,9 @@ public LogicalOlapScan(Table table, List<String> qualifier) {
      * @param table Doris table
      * @param qualifier table name qualifier
      */
-    public LogicalOlapScan(Table table, List<String> qualifier, 
Optional<GroupExpression> groupExpression,
-            Optional<LogicalProperties> logicalProperties, List<Long> 
selectedPartitionIdList,
+    public LogicalOlapScan(Table table, List<String> qualifier,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
+            List<Long> selectedPartitionIdList,

Review Comment:
   useless change



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.rules.analysis.OlapScanNodeId;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.planner.HashJoinNode;
+import org.apache.doris.planner.HashJoinNode.DistributionMode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
+import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
+import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TRuntimeFilterType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * generate runtime filter
+ */
+public class RuntimeFilterGenerator extends PlanPostprocessor {

Review Comment:
   split it into two class. move translate related attribute to 
`RuntimeFilterTranslatorContext` and add it into `PlanTranslatorContext`
   add a new class `RuntimeFilterTranslator` to do runtime filter translate



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java:
##########
@@ -56,6 +57,8 @@ public class CascadesContext {
     // subqueryExprIsAnalyzed: whether the subquery has been analyzed.
     private Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
 
+    private RuntimeFilterGenerator runtimeFilterGenerator;

Review Comment:
   final



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java:
##########
@@ -57,7 +80,7 @@ public PhysicalOlapScan(OlapTable olapTable, List<String> 
qualifier, long select
     /**
      * Constructor for PhysicalOlapScan.
      */
-    public PhysicalOlapScan(OlapTable olapTable, List<String> qualifier, long 
selectedIndexId,
+    public PhysicalOlapScan(OlapScanNodeId id, OlapTable olapTable, 
List<String> qualifier, long selectedIndexId,

Review Comment:
   i think it is better to use a new PR to submit the feature about RelationId



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java:
##########
@@ -38,7 +38,6 @@
  * Logical OlapScan.
  */
 public class LogicalOlapScan extends LogicalRelation {
-

Review Comment:
   add back



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/OlapScanNodeId.java:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.analysis;
+
+import org.apache.doris.common.Id;
+import org.apache.doris.common.IdGenerator;
+
+import java.util.Objects;
+
+/**
+ * id of logical olap scan node and physical olap scan node.
+ */
+public class OlapScanNodeId extends Id<OlapScanNodeId> {

Review Comment:
   rename it to RelationId



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to