Piotr Nowojski created FLINK-10734: -------------------------------------- Summary: Temporal joins on heavily filtered tables might fail in planning Key: FLINK-10734 URL: https://issues.apache.org/jira/browse/FLINK-10734 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.7.0 Reporter: Piotr Nowojski
Following query: {code} val sqlQuery = """ |SELECT | o.amount * r.rate AS amount |FROM | Orders AS o, | LATERAL TABLE (Rates(o.rowtime)) AS r |WHERE r.currency = o.currency |""".stripMargin {code} with {{Rates}} defined as follows: {code} tEnv.registerTable("EuroRatesHistory", tEnv.scan("RatesHistory").filter('currency === "Euro")) tEnv.registerFunction( "Rates", tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 'currency)) {code} Will fail with: {noformat} org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [] in [InnerJoin(where: (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, rowtime, currency, rate, rowtime0))] at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183) at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152) {noformat} The problem is that filtering condition {{('currency === "Euro")}} interferes with joining condition, simplifying it to nothing. Note how top {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and finally disappears: {noformat} LogicalProject(amount=[*($0, $4)]) LogicalFilter(condition=[=($3, $1)]) LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, $3)], joinType=[inner]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalFilter(condition=[=($0, _UTF-16LE'Euro')]) LogicalTableScan(table=[[_DataStreamTable_1]]) {noformat} {noformat} LogicalProject(amount=[*($0, $4)]) LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)]) LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL]) LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, $3)], joinType=[inner]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalFilter(condition=[=($0, _UTF-16LE'Euro')]) LogicalTableScan(table=[[_DataStreamTable_1]]) {noformat} {noformat} FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5]) FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, $2)], joinType=[inner]) FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4]) FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]]) FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4]) FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]]) {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)