lincoln-lil commented on code in PR #22827:
URL: https://github.com/apache/flink/pull/22827#discussion_r1236728489


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java:
##########
@@ -352,8 +358,25 @@ public void testProjectionIncludingOnlyMetadata() {
                 .containsExactly("metadata");
     }
 
+    private void replaceProgramWithProjectMergeRule() {
+        FlinkChainedProgram programs = new 
FlinkChainedProgram<BatchOptimizeContext>();
+        programs.addLast(
+                "rules",
+                
FlinkHepRuleSetProgramBuilder.<BatchOptimizeContext>newBuilder()
+                        
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+                        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                        .add(
+                                RuleSets.ofList(
+                                        CoreRules.PROJECT_MERGE,
+                                        
PushProjectIntoTableSourceScanRule.INSTANCE))
+                        .build());
+        util().replaceBatchProgram(programs);
+    }

Review Comment:
   the ast changes after we disable project merge during sql2rel phase by 
default in the 1st commit, and this rule test will fail, so it should stay in 
1st commit



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCalcMergeRule.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Extends calcite's ProjectCalcMergeRule, modification: does not merge the 
filter references field
+ * which generated by non-deterministic function.
+ */
+public class FlinkProjectCalcMergeRule extends ProjectCalcMergeRule {
+
+    public static final RelOptRule INSTANCE = new 
FlinkProjectCalcMergeRule(Config.DEFAULT);
+
+    protected FlinkProjectCalcMergeRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        LogicalProject project = call.rel(0);
+        LogicalCalc calc = call.rel(1);
+
+        List<RexNode> expandProjects =
+                calc.getProgram().getProjectList().stream()
+                        .map(p -> calc.getProgram().expandLocalRef(p))
+                        .collect(Collectors.toList());
+        InputRefVisitor inputRefVisitor = new InputRefVisitor();
+        project.getProjects().forEach(p -> p.accept(inputRefVisitor));
+        boolean existNonDeterministicRef =
+                Arrays.stream(inputRefVisitor.getFields())
+                        .anyMatch(i -> 
!RexUtil.isDeterministic(expandProjects.get(i)));
+
+        if (!existNonDeterministicRef) {
+            super.onMatch(call);
+        }
+    }
+}

Review Comment:
   make sense, I'll update it.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -650,6 +654,75 @@ object FlinkRexUtil {
       rexBuilder,
       converter);
   }
+
+  /**
+   * Return two neighbouring [[Project]] can merge into one [[Project]] or 
not. If the two
+   * [[Project]] can merge into one, each non-deterministic [[RexNode]] of 
bottom [[Project]] should
+   * appear at most once in the project list of top [[Project]].
+   */
+  def isMergeable(topProject: Project, bottomProject: Project): Boolean = {
+    val topInputRefCounter: Array[Int] =
+      Array.fill(topProject.getInput.getRowType.getFieldCount)(0)
+
+    mergeable(topInputRefCounter, topProject.getProjects, 
bottomProject.getProjects)
+  }
+
+  /**
+   * An InputRefCounter that count every inputRef's reference count number, 
every reference will be
+   * counted, e.g., '$0 + 1' & '$0 + 2' will count 2 instead of 1.
+   * @param deep
+   * @param refCounts
+   */
+  private class InputRefCounter(deep: Boolean, val refCounts: Array[Int])
+    extends RexVisitorImpl[Void](deep: Boolean) {
+    override def visitInputRef(inputRef: RexInputRef): Void = {
+      val index = inputRef.getIndex
+      refCounts(index) += 1
+      null
+    }
+  }
+
+  private def mergeable(

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -650,6 +654,75 @@ object FlinkRexUtil {
       rexBuilder,
       converter);
   }
+
+  /**
+   * Return two neighbouring [[Project]] can merge into one [[Project]] or 
not. If the two
+   * [[Project]] can merge into one, each non-deterministic [[RexNode]] of 
bottom [[Project]] should
+   * appear at most once in the project list of top [[Project]].
+   */
+  def isMergeable(topProject: Project, bottomProject: Project): Boolean = {
+    val topInputRefCounter: Array[Int] =
+      Array.fill(topProject.getInput.getRowType.getFieldCount)(0)
+
+    mergeable(topInputRefCounter, topProject.getProjects, 
bottomProject.getProjects)
+  }
+
+  /**
+   * An InputRefCounter that count every inputRef's reference count number, 
every reference will be
+   * counted, e.g., '$0 + 1' & '$0 + 2' will count 2 instead of 1.
+   * @param deep
+   * @param refCounts
+   */
+  private class InputRefCounter(deep: Boolean, val refCounts: Array[Int])
+    extends RexVisitorImpl[Void](deep: Boolean) {
+    override def visitInputRef(inputRef: RexInputRef): Void = {
+      val index = inputRef.getIndex
+      refCounts(index) += 1
+      null
+    }
+  }
+
+  private def mergeable(
+      topInputRefCounter: Array[Int],
+      topProjects: JList[RexNode],
+      bottomProjects: JList[RexNode]): Boolean = {
+    RexUtil.apply(new InputRefCounter(true, topInputRefCounter), topProjects, 
null)
+
+    bottomProjects.zipWithIndex
+      .map {
+        case (p, idx) =>
+          if (!RexUtil.isDeterministic(p)) {
+            topInputRefCounter(idx)
+          } else 0
+      }
+      .forall(cnt => cnt <= 1)
+  }
+
+  /**
+   * Return two neighbouring [[Calc]] can merge into one [[Calc]] or not. If 
the two [[Calc]] can
+   * merge into one, each non-deterministic [[RexNode]] of bottom [[Calc]] 
should appear at most
+   * once in the project list and filter list of top [[Calc]].
+   */
+  def isMergeable(topCalc: Calc, bottomCalc: Calc): Boolean = {
+    val topProgram = topCalc.getProgram
+    val bottomProgram = bottomCalc.getProgram
+    val topInputRefCounter: Array[Int] =
+      Array.fill(topCalc.getInput.getRowType.getFieldCount)(0)
+
+    val topInputRefs = if (null != topProgram.getCondition) {
+      topProgram.getProjectList.map(topProgram.expandLocalRef) :+ 
topProgram.expandLocalRef(
+        topProgram.getCondition)
+    } else {
+      topProgram.getProjectList.map(topProgram.expandLocalRef)
+    }.toList
+
+    mergeable(
+      topInputRefCounter,
+      topInputRefs.toList,
+      bottomProgram.getProjectList
+        .map(bottomProgram.expandLocalRef)
+        .toList)
+  }

Review Comment:
   Ok, this encourage me creating a new class `FlinkRelUtil` which is more 
appropriately than the `FlinkRexUtil` here



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectCalcMergeRule.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Extends calcite's ProjectCalcMergeRule, modification: does not merge the 
filter references field
+ * which generated by non-deterministic function.
+ */
+public class FlinkProjectCalcMergeRule extends ProjectCalcMergeRule {
+
+    public static final RelOptRule INSTANCE = new 
FlinkProjectCalcMergeRule(Config.DEFAULT);
+
+    protected FlinkProjectCalcMergeRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        LogicalProject project = call.rel(0);
+        LogicalCalc calc = call.rel(1);
+
+        List<RexNode> expandProjects =
+                calc.getProgram().getProjectList().stream()
+                        .map(p -> calc.getProgram().expandLocalRef(p))
+                        .collect(Collectors.toList());
+        InputRefVisitor inputRefVisitor = new InputRefVisitor();
+        project.getProjects().forEach(p -> p.accept(inputRefVisitor));
+        boolean existNonDeterministicRef =
+                Arrays.stream(inputRefVisitor.getFields())
+                        .anyMatch(i -> 
!RexUtil.isDeterministic(expandProjects.get(i)));

Review Comment:
   From a maintainability point of view, it is indeed better to use a unified 
reusable logic here



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectMergeRule.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.rules.ProjectMergeRule;
+
+/**
+ * Extends calcite's FilterCalcMergeRule for streaming scenario, modification: 
does not merge the
+ * filter references field which generated by non-deterministic function.
+ */
+public class FlinkProjectMergeRule extends ProjectMergeRule {
+
+    public static final RelOptRule INSTANCE = new 
FlinkProjectMergeRule(Config.DEFAULT);
+
+    protected FlinkProjectMergeRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final Project topProject = call.rel(0);
+        final Project bottomProject = call.rel(1);
+        if (FlinkRexUtil.isMergeable(topProject, bottomProject)) {
+            super.onMatch(call);
+        }
+    }

Review Comment:
   yes, will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to