Hello,
I am using Flink 1.11.2 as the execution engine for an alerting
application. Our application builds atop Flink's SQL API to run streaming
and batch jobs on a proprietary storage engine. We have a custom
StreamTableSource implementation that connects to our storage engine. The
connector currently implements the ProjectableTableSource interface. I now
wish to extend the connector to push down filters to the source for
improved performance. I have run into multiple issues in that effort -
1. Optimizer does not use both - ProjectableTableSource and
FilterableTableSource
- in a single query even if the source implements both interfaces. Each
interface works correctly if implemented independently.
2. Implementations of FilterableTableSource fail inside the optimizer for a
few TPC-DS queries in batch mode.
Stacktrace:
java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16,
_UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74)
at
org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68)
at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
...
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
...
at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
...
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org
$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
...
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
Config:
var settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment.create(settings);
3. And finally, filter expressions containing the current timestamp (& now)
function are not resolved to constant values during predicate pushdown
optimizer. Let's take the following SQL query for example - select count(*)
from T0 where T0.C2 >= current_timestamp. Here, applyPredicate method of
FilterableTableSource receives predicate as a CallExpression of form
greaterThanOrEqual(C2,
currentTimestamp()). I'd have expected currentTimestamp to be resolved to a
constant value that is identitcal across all usages of currentTimestamp in
the query.
Regards,
Satyam