Hello, I am using Flink 1.11.2 as the execution engine for an alerting application. Our application builds atop Flink's SQL API to run streaming and batch jobs on a proprietary storage engine. We have a custom StreamTableSource implementation that connects to our storage engine. The connector currently implements the ProjectableTableSource interface. I now wish to extend the connector to push down filters to the source for improved performance. I have run into multiple issues in that effort -
1. Optimizer does not use both - ProjectableTableSource and FilterableTableSource - in a single query even if the source implements both interfaces. Each interface works correctly if implemented independently. 2. Implementations of FilterableTableSource fail inside the optimizer for a few TPC-DS queries in batch mode. Stacktrace: java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16, _UTF-16LE'Books'), =($16, _UTF-16LE'Sports'))) at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74) at org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68) at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126) at org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45) at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130) at org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) ... at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60) ... at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) ... at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org $apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) ... at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213) Config: var settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment.create(settings); 3. And finally, filter expressions containing the current timestamp (& now) function are not resolved to constant values during predicate pushdown optimizer. Let's take the following SQL query for example - select count(*) from T0 where T0.C2 >= current_timestamp. Here, applyPredicate method of FilterableTableSource receives predicate as a CallExpression of form greaterThanOrEqual(C2, currentTimestamp()). I'd have expected currentTimestamp to be resolved to a constant value that is identitcal across all usages of currentTimestamp in the query. Regards, Satyam