fsk119 commented on a change in pull request #15307: URL: https://github.com/apache/flink/pull/15307#discussion_r608392756
########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.rel.rules.CoreRules; +import org.apache.calcite.tools.RuleSets; +import org.junit.Before; +import org.junit.Test; + +/** Test for {@link PushFilterInCalcIntoTableSourceRuleTest}. */ +public class PushFilterInCalcIntoTableSourceRuleTest extends PushFilterIntoTableSourceScanRuleTest { + + @Before + public void setup() { + util_$eq(streamTestUtil(new TableConfig())); + + FlinkChainedProgram<StreamOptimizeContext> program = new FlinkChainedProgram<>(); + program.addLast( + "Converters", + FlinkVolcanoProgramBuilder.<StreamOptimizeContext>newBuilder() + .add( + RuleSets.ofList( + CoreRules.PROJECT_TO_CALC, + CoreRules.FILTER_TO_CALC, + FlinkCalcMergeRule$.MODULE$.INSTANCE(), + FlinkLogicalCalc.CONVERTER(), + FlinkLogicalTableSourceScan.CONVERTER(), + FlinkLogicalWatermarkAssigner.CONVERTER())) + .setRequiredOutputTraits(new Convention[] {FlinkConventions.LOGICAL()}) + .build()); + program.addLast( + "Filter push in calc down", + FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder() + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList(PushFilterInCalcIntoTableSourceScanRule.INSTANCE)) + .build()); + ((StreamTableTestUtil) util()).replaceStreamProgram(program); + + String ddl1 = + "CREATE TABLE MyTable (\n" + + " name STRING,\n" + + " id bigint,\n" + + " amount int,\n" + + " price double\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'amount',\n" + + " 'bounded' = 'true'\n" + + ")"; + util().tableEnv().executeSql(ddl1); + + String ddl2 = + "CREATE TABLE VirtualTable (\n" + + " name STRING,\n" + + " id bigint,\n" + + " amount int,\n" + + " virtualField as amount + 1,\n" + + " price double\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'filterable-fields' = 'amount',\n" + + " 'bounded' = 'true'\n" + + ")"; + + util().tableEnv().executeSql(ddl2); + Review comment: nit: Move to a protected method, e.g. ``` protected void registerTables() { String ddl1 = "CREATE TABLE MyTable (\n" + " name STRING,\n" + " id bigint,\n" + " amount int,\n" + " price double\n" + ") WITH (\n" + " 'connector' = 'values',\n" + " 'filterable-fields' = 'amount',\n" + " 'bounded' = 'true'\n" + ")"; util().tableEnv().executeSql(ddl1); String ddl2 = "CREATE TABLE VirtualTable (\n" + " name STRING,\n" + " id bigint,\n" + " amount int,\n" + " virtualField as amount + 1,\n" + " price double\n" + ") WITH (\n" + " 'connector' = 'values',\n" + " 'filterable-fields' = 'amount',\n" + " 'bounded' = 'true'\n" + ")"; util().tableEnv().executeSql(ddl2); } ``` We can reuse these codes in `PushFilterInCalcIntoTableSourceRuleTest` and `PushFilterIntoTableSourceRuleTest`. ########## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml ########## @@ -185,8 +185,7 @@ LogicalProject(a=[$0], b=[$1]) </Resource> <Resource name="optimized rel plan"> <![CDATA[ -LogicalProject(a=[$0], b=[$1]) -+- LogicalTableScan(table=[[default_catalog, default_database, MTable, source: [filterPushedDown=[true], filter=[and(equals(lower(a), 'foo'), equals(upper(b), 'bar'))]]]]) +LegacyTableSourceScan(table=[[default_catalog, default_database, MTable, source: [filterPushedDown=[true], filter=[and(equals(lower(a), 'foo'), equals(upper(b), 'bar'))]]]], fields=[a, b]) Review comment: Why the plan changes? We only modify the stream rule set. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala ########## @@ -151,6 +151,7 @@ class PushFilterIntoLegacyTableSourceScanRuleTest extends TableTestBase { @Test def testLowerUpperPushdown(): Unit = { + util = batchTestUtil() Review comment: remove ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala ########## @@ -171,6 +172,7 @@ class PushFilterIntoLegacyTableSourceScanRuleTest extends TableTestBase { @Test def testWithInterval(): Unit = { + util = batchTestUtil() Review comment: remove ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.logical.LogicalCalc; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.tools.RelBuilder; + +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Pushes a {@link LogicalFilter} from the {@link LogicalCalc} and into a {@link LogicalTableScan}. + */ +public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase { + public static final PushFilterInCalcIntoTableSourceScanRule INSTANCE = + new PushFilterInCalcIntoTableSourceScanRule(); + + public PushFilterInCalcIntoTableSourceScanRule() { + super( + operand(Calc.class, operand(FlinkLogicalTableSourceScan.class, none())), + "PushFilterInCalcIntoTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + super.matches(call); + + Calc calc = call.rel(0); + RexProgram originProgram = calc.getProgram(); + + if (originProgram.getCondition() == null) { + return false; + } + + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + // we can not push filter twice + return canPushdownFilter(tableSourceTable); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Calc calc = call.rel(0); + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class); + pushFilterIntoScan(call, calc, scan, table); + } + + private void pushFilterIntoScan( + RelOptRuleCall call, + Calc calc, + FlinkLogicalTableSourceScan scan, + FlinkPreparingTableBase relOptTable) { + + RexProgram originProgram = calc.getProgram(); + + RelBuilder relBuilder = call.builder(); + Tuple2<RexNode[], RexNode[]> extractedPredicates = + extractPredicates( + originProgram.getInputRowType().getFieldNames().toArray(new String[0]), + originProgram.expandLocalRef(originProgram.getCondition()), + scan, + relBuilder.getRexBuilder()); + + RexNode[] convertiblePredicates = extractedPredicates._1; + if (convertiblePredicates.length == 0) { + // no condition can be translated to expression + return; + } + + Tuple2<SupportsFilterPushDown.Result, TableSourceTable> pushdownResultWithScan = + createTableScanAfterPushdown( + convertiblePredicates, + relOptTable.unwrap(TableSourceTable.class), + scan, + relBuilder); + + SupportsFilterPushDown.Result result = pushdownResultWithScan._1; + TableSourceTable tableSourceTable = pushdownResultWithScan._2; + + FlinkLogicalTableSourceScan newScan = + FlinkLogicalTableSourceScan.create(scan.getCluster(), tableSourceTable); + + // build new calc program + RexProgramBuilder programBuilder = + new RexProgramBuilder(newScan.getRowType(), call.builder().getRexBuilder()); + + if (!result.getRemainingFilters().isEmpty()) { + RexNode[] unconvertedPredicates = extractedPredicates._2; + RexNode remainingCondition = + getRemainingConditions(relBuilder, result, unconvertedPredicates); + + programBuilder.addCondition(remainingCondition); + } Review comment: result.getRemainingFilters().isEmpty() && extractedPredicates._2.length == 0 ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/FilterableSourceTest.scala ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql + +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5 +import org.apache.flink.table.planner.utils.TableTestBase +import org.junit.{Before, Test} + +/** + * Tests for pushing filter into table scan + */ +class FilterableSourceTest extends TableTestBase { + private val util = streamTestUtil() + + @Before + def setup(): Unit = { + val ddl = + """ + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | d STRING, + | WATERMARK FOR c AS c + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'a;d', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + | ) + |""".stripMargin + + util.tableEnv.executeSql(ddl) + } + + @Test + def testFullFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'hello'") + } + + @Test + def testPartialFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'h' AND d IS NOT NULL") + } + + @Test + def testNoFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE b > 5") + } + + @Test def testFullPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = Review comment: rename to `ddl` ########## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml ########## @@ -35,14 +35,33 @@ LogicalProject(name=[$0], id=[$1], amount=[$2], price=[$3]) ]]> </Resource> </TestCase> - <TestCase name="testCannotPushDownWithVirtualColumn"> + <TestCase name="testCannotPushDown3"> <Resource name="sql"> - <![CDATA[SELECT * FROM VirtualTable WHERE price > 10]]> + <![CDATA[SELECT * FROM MyTable WHERE amount > 2 OR amount < 10]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(name=[$0], id=[$1], amount=[$2], price=[$3]) ++- LogicalFilter(condition=[OR(>($2, 2), <($2, 10))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +LogicalProject(name=[$0], id=[$1], amount=[$2], price=[$3]) ++- LogicalFilter(condition=[OR(>($2, 2), <($2, 10))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, filter=[]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testCannotPushDown3WithVirtualColumn"> + <Resource name="sql"> + <![CDATA[SELECT * FROM VirtualTable WHERE amount > 2 OR amount < 10]]> Review comment: revert the modification of the file ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/FilterableSourceTest.scala ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql + +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5 +import org.apache.flink.table.planner.utils.TableTestBase +import org.junit.{Before, Test} + +/** + * Tests for pushing filter into table scan + */ +class FilterableSourceTest extends TableTestBase { + private val util = streamTestUtil() + + @Before + def setup(): Unit = { + val ddl = + """ + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | d STRING, + | WATERMARK FOR c AS c + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'a;d', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + | ) + |""".stripMargin + + util.tableEnv.executeSql(ddl) + } + + @Test + def testFullFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'hello'") + } + + @Test + def testPartialFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'h' AND d IS NOT NULL") + } + + @Test + def testNoFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE b > 5") + } + + @Test def testFullPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = + """ + |CREATE TABLE NoWatermark ( + | name STRING, + | event_time TIMESTAMP(3) + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'name', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + |""".stripMargin + + util.tableEnv.executeSql(ddl3) + util.verifyExecPlan( + "SELECT * FROM NoWatermark WHERE LOWER(name) = 'foo' AND UPPER(name) = 'FOO'" + ) + } + + @Test + def testPartialPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = Review comment: rename to ddl. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FilterableSourceITCase.scala ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql + +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.table.api.bridge.scala.tableConversions +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row +import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestData, TestingAppendSink} +import org.apache.flink.table.utils.LegacyRowResource +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.{Rule, Test} + +import java.time.LocalDateTime + +/** + * Tests for pushing filters into a table scan + */ +class FilterableSourceITCase extends StreamingTestBase { + + @Rule + def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE + + @Test + def testFilterPushdown(): Unit = { + val data = Seq( + row(1, 2L, LocalDateTime.parse("2020-11-21T19:00:05.23")), + row(2, 3L, LocalDateTime.parse("2020-11-21T21:00:05.23")) + ) + + val dataId = TestValuesTableFactory.registerData(data) + val ddl = + s""" + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | WATERMARK FOR c AS c + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'a;c;d', + | 'bounded' = 'false', + | 'disable-lookup' = 'true', + | 'data-id' = '$dataId' + | ) + |""".stripMargin + + tEnv.executeSql(ddl) + + val query = "SELECT * FROM MyTable WHERE a > 1" + val expectedData = Seq("2,3,2020-11-21T21:00:05.230") + + val result = tEnv.sqlQuery(query).toAppendStream[Row] + val sink = new TestingAppendSink() + result.addSink(sink) + + env.execute() + assertEquals(expectedData.sorted, sink.getAppendResults.sorted) + } + + @Test + def testWithRejectedFilter(): Unit = { + val data = Seq( + row(1, 2L, LocalDateTime.parse("2020-11-21T19:00:05.23")), + row(2, 3L, LocalDateTime.parse("2020-11-21T21:00:05.23")) + ) + + val dataId = TestValuesTableFactory.registerData(data) + + // Reject the filter by leaving out 'a' from 'filterable-fields' + val ddl = + s""" + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | WATERMARK FOR c AS c + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'c;d', + | 'bounded' = 'false', + | 'disable-lookup' = 'true', + | 'data-id' = '$dataId' + | ) + |""".stripMargin + + tEnv.executeSql(ddl) + + val query = "SELECT * FROM MyTable WHERE a > 1" + val expectedData = Seq("2,3,2020-11-21T21:00:05.230") + + val result = tEnv.sqlQuery(query).toAppendStream[Row] + val sink = new TestingAppendSink() + result.addSink(sink) + + env.execute() + assertEquals(expectedData.sorted, sink.getAppendResults.sorted) + } + + @Test + def testProjectWithWatermarkFilterPushdown(): Unit = { + val myTableDataId = TestValuesTableFactory.registerData(TestData.data3WithTimestamp) + val ddl = + s""" + |CREATE TABLE TableWithWatermark ( + | a int, + | b bigint, + | c string, + | d timestamp(3), + | WATERMARK FOR d as d + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'a,b,c', Review comment: Use `;`. ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java ########## @@ -18,24 +18,27 @@ package org.apache.flink.table.planner.plan.rules.logical; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.planner.calcite.CalciteConfig; import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext; import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram; import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.calcite.plan.hep.HepMatchOrder; import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.tools.RuleSets; +import org.junit.Test; /** Test for {@link PushFilterIntoTableSourceScanRule}. */ public class PushFilterIntoTableSourceScanRuleTest extends PushFilterIntoLegacyTableSourceScanRuleTest { - Review comment: Revert. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/FilterableSourceTest.scala ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql + +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5 +import org.apache.flink.table.planner.utils.TableTestBase +import org.junit.{Before, Test} + +/** + * Tests for pushing filter into table scan + */ +class FilterableSourceTest extends TableTestBase { + private val util = streamTestUtil() + + @Before + def setup(): Unit = { + val ddl = + """ + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | d STRING, + | WATERMARK FOR c AS c + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'a;d', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + | ) + |""".stripMargin + + util.tableEnv.executeSql(ddl) + } + + @Test + def testFullFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'hello'") + } + + @Test + def testPartialFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'h' AND d IS NOT NULL") + } + + @Test + def testNoFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE b > 5") + } + + @Test def testFullPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = + """ + |CREATE TABLE NoWatermark ( + | name STRING, + | event_time TIMESTAMP(3) + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'name', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + |""".stripMargin + + util.tableEnv.executeSql(ddl3) + util.verifyExecPlan( + "SELECT * FROM NoWatermark WHERE LOWER(name) = 'foo' AND UPPER(name) = 'FOO'" + ) + } + + @Test + def testPartialPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = + """ + |CREATE TABLE NoWatermark ( + | name STRING, + | event_time TIMESTAMP(3) + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'name', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + |""".stripMargin + + util.tableEnv.executeSql(ddl3) + util.verifyExecPlan("SELECT * FROM NoWatermark WHERE LOWER(name) = 'foo' AND name IS NOT NULL") + } + + @Test + def testComputedColumnPushdownAcrossWatermark() { + val ddl3 = Review comment: ditto ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java ########## @@ -82,8 +85,8 @@ public void setup() { util().tableEnv().executeSql(ddl2); } - @Override - public void testLowerUpperPushdown() { + @Test Review comment: Why rename? Please revert the change ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/FilterableSourceTest.scala ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql + +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc5 +import org.apache.flink.table.planner.utils.TableTestBase +import org.junit.{Before, Test} + +/** + * Tests for pushing filter into table scan + */ +class FilterableSourceTest extends TableTestBase { + private val util = streamTestUtil() + + @Before + def setup(): Unit = { + val ddl = + """ + | CREATE TABLE MyTable( + | a INT, + | b BIGINT, + | c TIMESTAMP(3), + | d STRING, + | WATERMARK FOR c AS c + | ) WITH ( + | 'connector' = 'values', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'a;d', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + | ) + |""".stripMargin + + util.tableEnv.executeSql(ddl) + } + + @Test + def testFullFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'hello'") + } + + @Test + def testPartialFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE LOWER(d) = 'h' AND d IS NOT NULL") + } + + @Test + def testNoFilterMatchWithWatermark(): Unit = { + util.verifyExecPlan("SELECT * FROM MyTable WHERE b > 5") + } + + @Test def testFullPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = + """ + |CREATE TABLE NoWatermark ( + | name STRING, + | event_time TIMESTAMP(3) + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'name', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + |""".stripMargin + + util.tableEnv.executeSql(ddl3) + util.verifyExecPlan( + "SELECT * FROM NoWatermark WHERE LOWER(name) = 'foo' AND UPPER(name) = 'FOO'" + ) + } + + @Test + def testPartialPushdownWithoutWatermarkAssigner(): Unit = { + val ddl3 = + """ + |CREATE TABLE NoWatermark ( + | name STRING, + | event_time TIMESTAMP(3) + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'name', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + |""".stripMargin + + util.tableEnv.executeSql(ddl3) + util.verifyExecPlan("SELECT * FROM NoWatermark WHERE LOWER(name) = 'foo' AND name IS NOT NULL") + } + + @Test + def testComputedColumnPushdownAcrossWatermark() { + val ddl3 = + """ + |CREATE TABLE WithWatermark ( + | event_time TIMESTAMP(3), + | name STRING, + | lowercase_name AS LOWER(name), + | WATERMARK FOR event_time AS event_time + |) WITH ( + | 'connector' = 'values', + | 'bounded' = 'false', + | 'enable-watermark-push-down' = 'true', + | 'filterable-fields' = 'name', + | 'disable-lookup' = 'true' + |) + |""".stripMargin + + util.tableEnv.executeSql(ddl3) + util.verifyExecPlan( + "SELECT * FROM WithWatermark WHERE lowercase_name = 'foo'") + } + + @Test + def testFilterPushdownWithUdf(): Unit = { + JavaFunc5.closeCalled = false + JavaFunc5.openCalled = false + util.tableEnv.createTemporarySystemFunction("func", new JavaFunc5) + val ddl3 = Review comment: ditto ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala ########## @@ -17,27 +17,27 @@ */ package org.apache.flink.table.planner.plan.rules.logical +import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.rules.CoreRules +import org.apache.calcite.tools.RuleSets import org.apache.flink.table.api.{DataTypes, TableSchema} import org.apache.flink.table.planner.expressions.utils.Func1 import org.apache.flink.table.planner.plan.optimize.program.{FlinkBatchProgram, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime -import org.apache.flink.table.planner.utils.{TableConfigUtils, TableTestBase, TestLegacyFilterableTableSource} Review comment: Revert? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Calc; +import org.apache.calcite.rel.logical.LogicalCalc; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.tools.RelBuilder; + +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Pushes a {@link LogicalFilter} from the {@link LogicalCalc} and into a {@link LogicalTableScan}. + */ +public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase { + public static final PushFilterInCalcIntoTableSourceScanRule INSTANCE = + new PushFilterInCalcIntoTableSourceScanRule(); + + public PushFilterInCalcIntoTableSourceScanRule() { + super( + operand(Calc.class, operand(FlinkLogicalTableSourceScan.class, none())), + "PushFilterInCalcIntoTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + super.matches(call); + + Calc calc = call.rel(0); + RexProgram originProgram = calc.getProgram(); + + if (originProgram.getCondition() == null) { + return false; + } + + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + // we can not push filter twice + return canPushdownFilter(tableSourceTable); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Calc calc = call.rel(0); + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class); + pushFilterIntoScan(call, calc, scan, table); + } + + private void pushFilterIntoScan( + RelOptRuleCall call, + Calc calc, + FlinkLogicalTableSourceScan scan, + FlinkPreparingTableBase relOptTable) { + + RexProgram originProgram = calc.getProgram(); + + RelBuilder relBuilder = call.builder(); + Tuple2<RexNode[], RexNode[]> extractedPredicates = + extractPredicates( + originProgram.getInputRowType().getFieldNames().toArray(new String[0]), + originProgram.expandLocalRef(originProgram.getCondition()), + scan, + relBuilder.getRexBuilder()); + + RexNode[] convertiblePredicates = extractedPredicates._1; + if (convertiblePredicates.length == 0) { + // no condition can be translated to expression + return; + } + + Tuple2<SupportsFilterPushDown.Result, TableSourceTable> pushdownResultWithScan = + createTableScanAfterPushdown( + convertiblePredicates, + relOptTable.unwrap(TableSourceTable.class), + scan, + relBuilder); + + SupportsFilterPushDown.Result result = pushdownResultWithScan._1; + TableSourceTable tableSourceTable = pushdownResultWithScan._2; + + FlinkLogicalTableSourceScan newScan = + FlinkLogicalTableSourceScan.create(scan.getCluster(), tableSourceTable); + + // build new calc program + RexProgramBuilder programBuilder = + new RexProgramBuilder(newScan.getRowType(), call.builder().getRexBuilder()); + + if (!result.getRemainingFilters().isEmpty()) { + RexNode[] unconvertedPredicates = extractedPredicates._2; + RexNode remainingCondition = + getRemainingConditions(relBuilder, result, unconvertedPredicates); + + programBuilder.addCondition(remainingCondition); + } + + List<RexNode> projects = + originProgram.getProjectList().stream() + .map(originProgram::expandLocalRef) + .collect(Collectors.toList()); + List<String> outputFieldNames = originProgram.getOutputRowType().getFieldNames(); + + for (int i = 0; i < projects.size(); i++) { + programBuilder.addProject(projects.get(i), outputFieldNames.get(i)); + } + + Calc newCalc = calc.copy(calc.getTraitSet(), newScan, programBuilder.getProgram()); + call.transformTo(newCalc); Review comment: If remaining filters is empty and `originProgram.getProjectList()` is empty, we don't need to build a new calc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org