Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as
the input tables,
but I still don't reproduce the exception. Could you provide your full
code, including ddl, query, etc.
Thanks so much.

Best,
Godfrey



Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 下午10:09写道:

> Godfrey,
>
>
>
> I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and
> am still having the same issue. Note that I am using the JDBC Connector for
> the input tables, and table1 and table2 are actually created from queries
> on those connector tables and not directly.
>
>
>
> Since you indicated what I did should work, I played around a bit more,
> and determined it’s something inside of the table2 query that is triggering
> the error. The id field there is generated by a table function. Removing
> that piece made the plan start working. Table 2 is formulated as follows:
>
>
>
> SELECT
>
>   T.id,
>
>   attr2,
>   attr3,
>
>   attr4
>
> FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id, ';')) AS T(id)
>
>
>
> Where SplitStringToRows is defined as:
>
>
>
> @FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
>
> class SplitStringToRows extends TableFunction[Row] {
>
>
>
>   def eval(str: String, separator: String = ";"): Unit = {
>
>     if (str != null) {
>
>       str.split(separator).foreach(s => collect(Row.of(s.trim())))
>
>     }
>
>   }
>
> }
>
>
>
> Removing the lateral table bit in that first table made the original query
> plan work correctly.
>
>
>
> I greatly appreciate your assistance!
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> *From: *godfrey he <godfre...@gmail.com>
> *Date: *Wednesday, November 18, 2020 at 7:33 AM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Lateral join not finding correlate variable
>
>
>
> Hi Dylan,
>
>
>
> Could you provide which Flink version you find out the problem with?
>
> I test the above query on master, and I get the plan, no errors occur.
>
> Here is my test case:
>
> @Test
> def testLateralJoin(): Unit = {
>   *util*.addTableSource[(String, String, String, String, String)]("table1", 
> 'id, 'attr1, 'attr2, 'attr3, 'attr4)
>   *util*.addTableSource[(String, String, String, String, String)]("table2", 
> 'id, 'attr1, 'attr2, 'attr3, 'attr4)
>   val query =
>     """
>       |SELECT
>       |  t1.id,
>       |  t1.attr1,
>       |  t2.attr2
>       |FROM table1 t1
>       |LEFT JOIN LATERAL (
>       |  SELECT
>       |    id,
>       |    attr2
>       |  FROM (
>       |    SELECT
>       |      id,
>       |      attr2,
>       |      ROW_NUMBER() OVER (
>       |        PARTITION BY id
>       |        ORDER BY
>       |          attr3 DESC,
>       |          t1.attr4 = attr4 DESC
>       |      ) AS row_num
>       |    FROM table2)
>       |    WHERE row_num = 1) t2
>       |ON t1.id = t2.id
>       |""".stripMargin
>   *util*.verifyPlan(query)
> }
>
> Best,
>
> Godfrey
>
>
>
> Dylan Forciea <dy...@oseberg.io> 于2020年11月18日周三 上午7:44写道:
>
> This may be due to not understanding  lateral joins in Flink – perhaps you
> can only do so on temporal variables – but I figured I’d ask since the
> error message isn’t intuitive.
>
>
>
> I am trying to do a combination of a lateral join and a top N query. Part
> of my ordering is based upon whether the a value in the left side of the
> query matches up. I’m trying to do this in the general form of:
>
>
>
> SELECT
>
>   t1.id,
>
>   t1.attr1,
>
>   t2.attr2
>
> FROM table1 t1
>
> LEFT JOIN LATERAL (
>
>   SELECT
>
>     id,
>
>     attr2
>
>   FROM (
>
>     SELECT
>
>       id,
>
>       attr2,
>
>       ROW_NUMBER() OVER (
>
>         PARTITION BY id
>         ORDER BY
>
>           attr3 DESC,
>
>           t1.attr4 = attr4 DESC
>
>       ) AS row_num
>
>     FROM table2
>
>     WHERE row_num = 1) t2
>
> ON (t1.id = t2.id)
>
>
>
> I am getting an error that looks like:
>
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> unexpected correlate variable $cor2 in the plan
>
>      at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
>
>      at
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>
>      at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>
>      at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>
>      at
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>
>      at scala.collection.Iterator.foreach(Iterator.scala:943)
>
>      at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>
>      at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>
>      at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>
>      at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
>      at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>
>      at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>
>      at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>
>      at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>
>      at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>
>      at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>
>      at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>
>      at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
>
>      at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
>
>      at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
>      at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
>      at
> org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
>
>      at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
>
>      at io.oseberg.flink.well.ok.Job.main(Job.scala)
>
>
>
> The only other thing I can think of doing is creating a Table Aggregate
> function to pull this off. But, I wanted to check to make sure I wasn’t
> doing something wrong in the above first, or if there is something I’m not
> thinking of doing.
>
>
>
> Regards,
>
> Dylan Forciea
>
>

Reply via email to