This may be due to not understanding  lateral joins in Flink – perhaps you can 
only do so on temporal variables – but I figured I’d ask since the error 
message isn’t intuitive.

I am trying to do a combination of a lateral join and a top N query. Part of my 
ordering is based upon whether the a value in the left side of the query 
matches up. I’m trying to do this in the general form of:

SELECT
  t1.id,
  t1.attr1,
  t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
  SELECT
    id,
    attr2
  FROM (
    SELECT
      id,
      attr2,
      ROW_NUMBER() OVER (
        PARTITION BY id
        ORDER BY
          attr3 DESC,
          t1.attr4 = attr4 DESC
      ) AS row_num
    FROM table2
    WHERE row_num = 1) t2
ON (t1.id = t2.id)

I am getting an error that looks like:

Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor2 in the plan
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
     at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
     at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
     at scala.collection.Iterator.foreach(Iterator.scala:943)
     at scala.collection.Iterator.foreach$(Iterator.scala:943)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
     at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
     at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
     at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
     at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
     at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
     at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
     at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
     at 
org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
     at io.oseberg.flink.well.ok.Job.main(Job.scala)

The only other thing I can think of doing is creating a Table Aggregate 
function to pull this off. But, I wanted to check to make sure I wasn’t doing 
something wrong in the above first, or if there is something I’m not thinking 
of doing.

Regards,
Dylan Forciea

Reply via email to