Godfrey,

I confirmed that in Flink 1.11.2 and in 1.12-SNAPSHOT I get the stack trace 
running exactly this code:

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.functions.TableFunction


@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {
  def eval(str: String, separator: String = ";"): Unit = {
    if (str != null) {
      str.split(separator).foreach(s => collect(Row.of(s.trim())))
    }
  }
}
object Job {

  def main(args: Array[String]): Unit = {
    val settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)

    streamTableEnv.createTemporarySystemFunction(
      "SplitStringToRows",
      classOf[SplitStringToRows]
    ) // Class defined in previous email

    streamTableEnv.executeSql(
      """
      CREATE TABLE table1 (
        id_source BIGINT PRIMARY KEY,
        attr1_source STRING,
        attr2 STRING
      ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
       'table-name' = '<table>',
       'username' = '<username>',
       'password' = '<password>',
       'scan.fetch-size' = '500',
       'scan.auto-commit' = 'false')
    """)

    streamTableEnv.executeSql(
      """
      CREATE TABLE table2 (
        attr1_source STRING,
        attr2 STRING,
        attr3 DECIMAL,
        attr4 DATE
      ) WITH (
       'connector' = 'jdbc',
       'url' = 'jdbc:postgresql://host.domain.com/db1?ssl=true',
       'table-name' = '<table>',
       'username' = '<username>',
       'password' = '<password>',
       'scan.fetch-size' = '500',
       'scan.auto-commit' = 'false')
    """)

    val q1 = streamTableEnv.sqlQuery("""
      SELECT
        id_source AS id,
        attr1_source AS attr1,
        attr2
      FROM table1
    """)
    streamTableEnv.createTemporaryView("view1", q1)

    val q2 = streamTableEnv.sqlQuery(
      """
        SELECT
          a.attr1 AS attr1,
          attr2,
          attr3,
          attr4
        FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1_source, ';')) AS 
a(attr1)
    """)
    streamTableEnv.createTemporaryView("view2", q2)

    val q3 = streamTableEnv.sqlQuery("""
        SELECT
          w.attr1,
          p.attr3
        FROM view1 w
        LEFT JOIN LATERAL (
          SELECT
            attr1,
            attr3
          FROM (
            SELECT
              attr1,
              attr3,
              ROW_NUMBER() OVER (
                PARTITION BY attr1
                ORDER BY
                  attr4 DESC NULLS LAST,
                  w.attr2 = attr2 DESC NULLS LAST
              ) AS row_num
          FROM view2)
          WHERE row_num = 1) p
        ON (w.attr1 = p.attr1)
        """)
    streamTableEnv.createTemporaryView("view3", q3)

    val view3 = streamTableEnv.from("view3")

    view3
      .toRetractStream[Row]
      .writeAsCsv("./view3.csv", WriteMode.OVERWRITE)
      .setParallelism(1)

    streamEnv.execute()
  }
}

Thanks,
Dylan Forciea

From: godfrey he <godfre...@gmail.com>
Date: Wednesday, November 18, 2020 at 8:29 PM
To: Dylan Forciea <dy...@oseberg.io>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Lateral join not finding correlate variable

Dylan,

Thanks for you feedback, if the planner encounters
"unexpected correlate variable $cor2 in the plan" exception,
There's a high probability that FlinkDecorrelateProgram has some bugs
or the query pattern is not supported now. I try to use JDBC Connector as the 
input tables,
but I still don't reproduce the exception. Could you provide your full code, 
including ddl, query, etc.
Thanks so much.

Best,
Godfrey



Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> 于2020年11月18日周三 
下午10:09写道:
Godfrey,

I was using Flink 1.11.2, but I just tried switching to 1.12-SNAPSHOT and am 
still having the same issue. Note that I am using the JDBC Connector for the 
input tables, and table1 and table2 are actually created from queries on those 
connector tables and not directly.

Since you indicated what I did should work, I played around a bit more, and 
determined it’s something inside of the table2 query that is triggering the 
error. The id field there is generated by a table function. Removing that piece 
made the plan start working. Table 2 is formulated as follows:

SELECT
  T.id,
  attr2,
  attr3,
  attr4
FROM table3 t3, LATERAL TABLE(SplitStringToRows(t3.id<http://t3.id>, ';')) AS 
T(id)

Where SplitStringToRows is defined as:

@FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
class SplitStringToRows extends TableFunction[Row] {

  def eval(str: String, separator: String = ";"): Unit = {
    if (str != null) {
      str.split(separator).foreach(s => collect(Row.of(s.trim())))
    }
  }
}

Removing the lateral table bit in that first table made the original query plan 
work correctly.

I greatly appreciate your assistance!

Regards,
Dylan Forciea

From: godfrey he <godfre...@gmail.com<mailto:godfre...@gmail.com>>
Date: Wednesday, November 18, 2020 at 7:33 AM
To: Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Lateral join not finding correlate variable

Hi Dylan,

Could you provide which Flink version you find out the problem with?
I test the above query on master, and I get the plan, no errors occur.
Here is my test case:

@Test
def testLateralJoin(): Unit = {
  util.addTableSource[(String, String, String, String, String)]("table1", 'id, 
'attr1, 'attr2, 'attr3, 'attr4)
  util.addTableSource[(String, String, String, String, String)]("table2", 'id, 
'attr1, 'attr2, 'attr3, 'attr4)
  val query =
    """
      |SELECT
      |  t1.id<http://t1.id>,
      |  t1.attr1,
      |  t2.attr2
      |FROM table1 t1
      |LEFT JOIN LATERAL (
      |  SELECT
      |    id,
      |    attr2
      |  FROM (
      |    SELECT
      |      id,
      |      attr2,
      |      ROW_NUMBER() OVER (
      |        PARTITION BY id
      |        ORDER BY
      |          attr3 DESC,
      |          t1.attr4 = attr4 DESC
      |      ) AS row_num
      |    FROM table2)
      |    WHERE row_num = 1) t2
      |ON t1.id<http://t1.id> = t2.id<http://t2.id>
      |""".stripMargin
  util.verifyPlan(query)
}
Best,
Godfrey

Dylan Forciea <dy...@oseberg.io<mailto:dy...@oseberg.io>> 于2020年11月18日周三 
上午7:44写道:
This may be due to not understanding  lateral joins in Flink – perhaps you can 
only do so on temporal variables – but I figured I’d ask since the error 
message isn’t intuitive.

I am trying to do a combination of a lateral join and a top N query. Part of my 
ordering is based upon whether the a value in the left side of the query 
matches up. I’m trying to do this in the general form of:

SELECT
  t1.id<http://t1.id>,
  t1.attr1,
  t2.attr2
FROM table1 t1
LEFT JOIN LATERAL (
  SELECT
    id,
    attr2
  FROM (
    SELECT
      id,
      attr2,
      ROW_NUMBER() OVER (
        PARTITION BY id
        ORDER BY
          attr3 DESC,
          t1.attr4 = attr4 DESC
      ) AS row_num
    FROM table2
    WHERE row_num = 1) t2
ON (t1.id<http://t1.id> = t2.id<http://t2.id>)

I am getting an error that looks like:

Exception in thread "main" org.apache.flink.table.api.TableException: 
unexpected correlate variable $cor2 in the plan
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:58)
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
     at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
     at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
     at scala.collection.Iterator.foreach(Iterator.scala:943)
     at scala.collection.Iterator.foreach$(Iterator.scala:943)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
     at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
     at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
     at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
     at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
     at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:294)
     at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
     at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
     at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
     at 
org.apache.flink.table.api.bridge.scala.TableConversions.toRetractStream(TableConversions.scala:97)
     at io.oseberg.flink.well.ok.Job$.main(Job.scala:57)
     at io.oseberg.flink.well.ok.Job.main(Job.scala)

The only other thing I can think of doing is creating a Table Aggregate 
function to pull this off. But, I wanted to check to make sure I wasn’t doing 
something wrong in the above first, or if there is something I’m not thinking 
of doing.

Regards,
Dylan Forciea

Reply via email to