fsk119 commented on a change in pull request #15307: URL: https://github.com/apache/flink/pull/15307#discussion_r606677062
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.tools.RelBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Pushes a {@link org.apache.calcite.rel.logical.LogicalFilter} from the {@link org.apache.calcite.rel.logical.LogicalCalc} + * and into a {@link LogicalTableScan} + */ +public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase { + public static final PushFilterInCalcIntoTableSourceScanRule INSTANCE = + new PushFilterInCalcIntoTableSourceScanRule(); + + public PushFilterInCalcIntoTableSourceScanRule() { + super( + operand(FlinkLogicalCalc.class, operand(FlinkLogicalTableSourceScan.class, none())), + "PushFilterInCalcIntoTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + TableConfig config = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!config.getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) { + return false; + } + + FlinkLogicalCalc calc = call.rel(0); + RexProgram originProgram = calc.getProgram(); + + if (originProgram.getCondition() == null) { + return false; + } + + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + // we can not push filter twice + return canPushdownFilter(tableSourceTable); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalCalc calc = call.rel(0); + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class); + pushFilterIntoScan(call, calc, scan, table); + } + + private void pushFilterIntoScan( + RelOptRuleCall call, + FlinkLogicalCalc calc, + FlinkLogicalTableSourceScan scan, + FlinkPreparingTableBase relOptTable) { + + RexProgram originProgram = calc.getProgram(); + + RelBuilder relBuilder = call.builder(); + Tuple2<RexNode[], RexNode[]> tuple2 = extractPredicates( + originProgram.getInputRowType().getFieldNames().toArray(new String[0]), + originProgram.expandLocalRef(originProgram.getCondition()), + scan, + relBuilder.getRexBuilder()); + + RexNode[] convertiblePredicates = tuple2._1; + if (convertiblePredicates.length == 0) { + // no condition can be translated to expression + return; + } + + Tuple2<SupportsFilterPushDown.Result, FlinkLogicalTableSourceScan> pushdownResultWithScan = createTableScanAfterPushdown( + convertiblePredicates, + relOptTable.unwrap(TableSourceTable.class), + scan, + relBuilder); + + SupportsFilterPushDown.Result result = pushdownResultWithScan._1; + FlinkLogicalTableSourceScan newScan = pushdownResultWithScan._2; + + RexNode[] unconvertedPredicates = tuple2._2; + // check whether framework still need to do a filter + if (result.getRemainingFilters().isEmpty() && unconvertedPredicates.length == 0) { + call.transformTo(newScan); + } else { + List<RexNode> remainingPredicates = + convertExpressionToRexNode(result.getRemainingFilters(), relBuilder); + remainingPredicates.addAll(Arrays.asList(unconvertedPredicates)); + RexNode remainingCondition = relBuilder.and(remainingPredicates); Review comment: Add a method to reuse these codes ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java ########## @@ -29,10 +29,9 @@ import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.tools.RuleSets; -/** Test for {@link PushFilterIntoTableSourceScanRule}. */ +/** Test for {@link PushFilterIntoTableSourceScanRule} */ Review comment: Revert. ########## File path: flink-table/flink-table-runtime-blink/.java-version ########## @@ -0,0 +1 @@ +1.8 Review comment: Why add these? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.tools.RelBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Pushes a {@link org.apache.calcite.rel.logical.LogicalFilter} from the {@link org.apache.calcite.rel.logical.LogicalCalc} + * and into a {@link LogicalTableScan} + */ +public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase { + public static final PushFilterInCalcIntoTableSourceScanRule INSTANCE = + new PushFilterInCalcIntoTableSourceScanRule(); + + public PushFilterInCalcIntoTableSourceScanRule() { + super( + operand(FlinkLogicalCalc.class, operand(FlinkLogicalTableSourceScan.class, none())), + "PushFilterInCalcIntoTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + TableConfig config = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!config.getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) { + return false; + } Review comment: Add these codes to `canPushdownFilter`? ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FilterableSourceITCase.scala ########## @@ -0,0 +1,97 @@ +package org.apache.flink.table.planner.runtime.stream.sql + +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.table.api.bridge.scala.tableConversions +import org.apache.flink.table.planner.factories.TestValuesTableFactory +import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row +import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestingAppendSink} +import org.apache.flink.table.utils.LegacyRowResource +import org.apache.flink.types.Row +import org.junit.Assert.assertEquals +import org.junit.{Rule, Test} + +import java.time.LocalDateTime + +class FilterableSourceITCase extends StreamingTestBase { Review comment: add java doc ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.tools.RelBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; + +/** + * Pushes a {@link org.apache.calcite.rel.logical.LogicalFilter} from the {@link org.apache.calcite.rel.logical.LogicalCalc} + * and into a {@link LogicalTableScan} + */ +public class PushFilterInCalcIntoTableSourceScanRule extends PushFilterIntoSourceScanRuleBase { + public static final PushFilterInCalcIntoTableSourceScanRule INSTANCE = + new PushFilterInCalcIntoTableSourceScanRule(); + + public PushFilterInCalcIntoTableSourceScanRule() { + super( + operand(FlinkLogicalCalc.class, operand(FlinkLogicalTableSourceScan.class, none())), + "PushFilterInCalcIntoTableSourceScanRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + TableConfig config = ShortcutUtils.unwrapContext(call.getPlanner()).getTableConfig(); + if (!config.getConfiguration() + .getBoolean( + OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED)) { + return false; + } + + FlinkLogicalCalc calc = call.rel(0); + RexProgram originProgram = calc.getProgram(); + + if (originProgram.getCondition() == null) { + return false; + } + + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + // we can not push filter twice + return canPushdownFilter(tableSourceTable); + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalCalc calc = call.rel(0); + FlinkLogicalTableSourceScan scan = call.rel(1); + TableSourceTable table = scan.getTable().unwrap(TableSourceTable.class); + pushFilterIntoScan(call, calc, scan, table); + } + + private void pushFilterIntoScan( + RelOptRuleCall call, + FlinkLogicalCalc calc, + FlinkLogicalTableSourceScan scan, + FlinkPreparingTableBase relOptTable) { + + RexProgram originProgram = calc.getProgram(); + + RelBuilder relBuilder = call.builder(); + Tuple2<RexNode[], RexNode[]> tuple2 = extractPredicates( Review comment: Use a meaningful name, e.g `extractedPredicates` ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; +import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND; + +public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule { + public PushFilterIntoSourceScanRuleBase( + RelOptRuleOperand operand, + String description) { + super(operand, description); + } + + protected List<RexNode> convertExpressionToRexNode( + List<ResolvedExpression> expressions, RelBuilder relBuilder) { + ExpressionConverter exprConverter = new ExpressionConverter(relBuilder); + return expressions.stream().map(e -> e.accept(exprConverter)).collect(Collectors.toList()); + } + + protected Tuple2<SupportsFilterPushDown.Result, FlinkLogicalTableSourceScan> createTableScanAfterPushdown( + RexNode[] convertiblePredicates, + TableSourceTable oldTableSourceTable, + TableScan scan, + RelBuilder relBuilder) { + // record size before applyFilters for update statistics + int originPredicatesSize = convertiblePredicates.length; + + // update DynamicTableSource + DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy(); + + SupportsFilterPushDown.Result result = + FilterPushDownSpec.apply( + Arrays.asList(convertiblePredicates), + newTableSource, + SourceAbilityContext.from(scan)); + + relBuilder.push(scan); + List<RexNode> acceptedPredicates = + convertExpressionToRexNode(result.getAcceptedFilters(), relBuilder); + FilterPushDownSpec filterPushDownSpec = new FilterPushDownSpec(acceptedPredicates); + + // record size after applyFilters for update statistics + int updatedPredicatesSize = result.getRemainingFilters().size(); + // set the newStatistic newTableSource and extraDigests + TableSourceTable newTableSourceTable = + oldTableSourceTable.copy( + newTableSource, + getNewFlinkStatistic( + oldTableSourceTable, originPredicatesSize, updatedPredicatesSize), + getNewExtraDigests(result.getAcceptedFilters()), + new SourceAbilitySpec[]{filterPushDownSpec}); + + return new Tuple2<>(result, FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable)); + } + + Review comment: Use a blank line ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/FilterableSourceITCase.scala ########## @@ -0,0 +1,97 @@ +package org.apache.flink.table.planner.runtime.stream.sql Review comment: add license ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala ########## @@ -363,4 +363,35 @@ class CalcITCase extends StreamingTestBase { List("1,HI,1111,true,111","2,HELLO,2222,false,222", "3,HELLO WORLD,3333,true,333") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testProjectWithWatermarkFilterPushdown(): Unit = { + val myTableDataId = TestValuesTableFactory.registerData(TestData.data3WithTimestamp) + val ddl = + s""" + |CREATE TABLE TableWithWatermark ( + | a int, + | b bigint, + | c string, + | d timestamp(3), + | WATERMARK FOR d as d + |) WITH ( + | 'connector' = 'values', + | 'filterable-fields' = 'a,b,c', + | 'enable-watermark-push-down' = 'true', + | 'data-id' = '$myTableDataId', + | 'bounded' = 'false', + | 'disable-lookup' = 'true' + |) + """.stripMargin + tEnv.executeSql(ddl) + + val result = tEnv.sqlQuery( "select a, c from TableWithWatermark WHERE LOWER(c) = 'hello'").toAppendStream[Row] + val sink = new TestingAppendSink + result.addSink(sink) + env.execute() + + val expected = List("2,Hello") Review comment: move this test to `FilterableSourceTest`? ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.rel.rules.CoreRules; +import org.apache.calcite.tools.RuleSets; +import org.junit.Before; +import org.junit.Test; + +/** Test for {@link PushFilterInCalcIntoTableSourceRuleTest}. */ +public class PushFilterInCalcIntoTableSourceRuleTest extends TableTestBase { + Review comment: I think the test is not enought to cover all situation. I think we should add case: - verify this rule works with/without watermark assigner - verify this rule works filter partially/fully push down - verify this rule push down the computed column - verify the rule with udf ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; +import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.RexNodeExtractor; +import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter; +import org.apache.flink.table.planner.utils.ShortcutUtils; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Tuple2; + +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND; + +public abstract class PushFilterIntoSourceScanRuleBase extends RelOptRule { Review comment: Add java doc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org