Hi Satyam, Currently, the community is using the new table source/sink API and the `FilterableTableSource`, `ProjectableTableSource` have been deprecated. The interface `SupportsProjectionPushDown` and `SupportsFilterPushDown` are the new interfaces to push down the `projection` and `filter`.
You can refer to this class for more implementation details[1]. [1] https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java#L616 Satyam Shekhar <satyamshek...@gmail.com> 于2021年1月12日周二 下午3:54写道: > Hello, > > I am using Flink 1.11.2 as the execution engine for an alerting > application. Our application builds atop Flink's SQL API to run streaming > and batch jobs on a proprietary storage engine. We have a custom > StreamTableSource implementation that connects to our storage engine. The > connector currently implements the ProjectableTableSource interface. I > now wish to extend the connector to push down filters to the source for > improved performance. I have run into multiple issues in that effort - > > 1. Optimizer does not use both - ProjectableTableSource and > FilterableTableSource > - in a single query even if the source implements both interfaces. Each > interface works correctly if implemented independently. > > 2. Implementations of FilterableTableSource fail inside the optimizer for > a few TPC-DS queries in batch mode. > > Stacktrace: > > java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16, > _UTF-16LE'Books'), =($16, _UTF-16LE'Sports'))) > at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74) > at > org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68) > at > org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126) > at > org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45) > at > org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130) > at > org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) > ... > at > org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) > ... > at > org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > ... > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org > $apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) > at scala.collection.immutable.List.foreach(List.scala:392) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) > ... > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213) > > > Config: > var settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inBatchMode() > .build(); > TableEnvironment.create(settings); > > 3. And finally, filter expressions containing the current timestamp (& > now) function are not resolved to constant values during predicate pushdown > optimizer. Let's take the following SQL query for example - select > count(*) from T0 where T0.C2 >= current_timestamp. Here, applyPredicate > method of FilterableTableSource receives predicate as a CallExpression > of form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected > currentTimestamp to be resolved to a constant value that is identitcal > across all usages of currentTimestamp in the query. > > Regards, > Satyam >