Hi Satyam,

Currently, the community is using the new table source/sink API and the
`FilterableTableSource`, `ProjectableTableSource` have been deprecated. The
interface `SupportsProjectionPushDown` and `SupportsFilterPushDown` are the
new interfaces to push down the `projection` and `filter`.

You can refer to this class for more implementation details[1].

[1]
https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java#L616

Satyam Shekhar <satyamshek...@gmail.com> 于2021年1月12日周二 下午3:54写道:

> Hello,
>
> I am using Flink 1.11.2 as the execution engine for an alerting
> application. Our application builds atop Flink's SQL API to run streaming
> and batch jobs on a proprietary storage engine. We have a custom
> StreamTableSource implementation that connects to our storage engine. The
> connector currently implements the ProjectableTableSource interface. I
> now wish to extend the connector to push down filters to the source for
> improved performance. I have run into multiple issues in that effort -
>
> 1. Optimizer does not use both - ProjectableTableSource and 
> FilterableTableSource
> - in a single query even if the source implements both interfaces. Each
> interface works correctly if implemented independently.
>
> 2. Implementations of FilterableTableSource fail inside the optimizer for
> a few TPC-DS queries in batch mode.
>
> Stacktrace:
>
> java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16,
> _UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
>    at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74)
>    at
> org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68)
>    at
> org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
>    at
> org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
>    at
> org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
>    at
> org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
>    at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
>    ...
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>    ...
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>    ...
>    at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>    at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
>    at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org
> $apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
>    at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
>    at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
>    at scala.collection.immutable.List.foreach(List.scala:392)
>    at
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
>    at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>    at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>    ...
>    at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
>
>
> Config:
>   var settings = EnvironmentSettings.newInstance()
>                                   .useBlinkPlanner()
>                                   .inBatchMode()
>                                   .build();
>   TableEnvironment.create(settings);
>
> 3. And finally, filter expressions containing the current timestamp (&
> now) function are not resolved to constant values during predicate pushdown
> optimizer. Let's take the following SQL query for example - select
> count(*) from T0 where T0.C2 >= current_timestamp.  Here, applyPredicate
> method of FilterableTableSource receives predicate as a CallExpression
> of  form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected
> currentTimestamp to be resolved to a constant value that is identitcal
> across all usages of currentTimestamp in the query.
>
> Regards,
> Satyam
>

Reply via email to