Hello,

I am using Flink 1.11.2 as the execution engine for an alerting
application. Our application builds atop Flink's SQL API to run streaming
and batch jobs on a proprietary storage engine. We have a custom
StreamTableSource implementation that connects to our storage engine. The
connector currently implements the ProjectableTableSource interface. I now
wish to extend the connector to push down filters to the source for
improved performance. I have run into multiple issues in that effort -

1. Optimizer does not use both - ProjectableTableSource and
FilterableTableSource
- in a single query even if the source implements both interfaces. Each
interface works correctly if implemented independently.

2. Implementations of FilterableTableSource fail inside the optimizer for a
few TPC-DS queries in batch mode.

Stacktrace:

java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16,
_UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
   at org.apache.calcite.rel.core.Filter.<init>(Filter.java:74)
   at
org.apache.calcite.rel.logical.LogicalFilter.<init>(LogicalFilter.java:68)
   at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
   at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
   at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
   at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
   at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org
$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:392)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
   at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
   ...
   at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)


Config:
  var settings = EnvironmentSettings.newInstance()
                                  .useBlinkPlanner()
                                  .inBatchMode()
                                  .build();
  TableEnvironment.create(settings);

3. And finally, filter expressions containing the current timestamp (& now)
function are not resolved to constant values during predicate pushdown
optimizer. Let's take the following SQL query for example - select count(*)
from T0 where T0.C2 >= current_timestamp.  Here, applyPredicate method of
FilterableTableSource receives predicate as a CallExpression of  form
greaterThanOrEqual(C2,
currentTimestamp()). I'd have expected currentTimestamp to be resolved to a
constant value that is identitcal across all usages of currentTimestamp in
the query.

Regards,
Satyam

Reply via email to