[ 
https://issues.apache.org/jira/browse/FLINK-22454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348075#comment-17348075
 ] 

Wenlong Lyu commented on FLINK-22454:
-------------------------------------

Hi, [~fsk119], I think is error is by designed. Lookup join must have look up 
keys in join condition, we ignored casting on field ref while extracting lookup 
keys(see CommonPhysicalLookupJoin#getIdenticalSourceField), in order to support 
lookup keys which have interoperable types(see 
PlannerTypeUtils#isInteroperable), and added  a check on the type of lookup 
keys to make sure that the types is really interoperable. 

In your case, the cast can not be ignored(int and decimal are not 
interoperable), so planner throws error about type not compatible on lookup key.

> Failed to translate Lookup Join when join on a CAST expression on dimention 
> table column
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-22454
>                 URL: https://issues.apache.org/jira/browse/FLINK-22454
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.0
>            Reporter: Shengkai Fang
>            Priority: Major
>
> Please add test in {{LookupJoinTest}}
> {code:java}
>  def before(): Unit ={
>     util.addDataStream[(Int, String, Long)](
>       "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
>     if (legacyTableSource) {
>       TestTemporalTable.createTemporaryTable(util.tableEnv, "LookupTable")
>     } else {
>       util.addTable(
>         """
>           |CREATE TABLE LookupTable (
>           |  `id` DECIMAL(38, 10),
>           |  `to_qty` DECIMAL(38, 10),
>           |  `name` STRING,
>           |  `age` INT,
>           |  `id_int` as CAST(`id` AS INT)
>           |) WITH (
>           |  'connector' = 'values'
>           |)
>           |""".stripMargin)
>   }
> {code}
> {code:java}
> @Test
>   def test(): Unit = {
>     val sql =
>     """
>     |SELECT MyTable.b, LookupTable.`to_qty`
>     |FROM MyTable
>     |LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF MyTable.`proctime`
>     |ON MyTable.a = CAST(LookupTable.`id` as INT)
>     |""".stripMargin
>     util.tableEnv.sqlQuery(sql).explain()
>   }
> {code}
> The exception stack is 
> {code}
> org.apache.flink.table.api.TableException: Temporal table join requires 
> equivalent condition of the same type, but the condition is 
> a[INT]=id[DECIMAL(38, 10)]
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validateLookupKeyType(CommonExecLookupJoin.java:303)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:222)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>       at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691)
>       at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>       at 
> org.apache.flink.table.planner.plan.stream.sql.join.LookupJoinTest.test(LookupJoinTest.scala:197)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runners.Suite.runChild(Suite.java:128)
>       at org.junit.runners.Suite.runChild(Suite.java:27)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>       at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>       at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to