Hi,
Flink 1.14.0
Scala 2.12
Java 11

Given the following program:

package com.flink.bug

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.Schema
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Test {  final case class Person(name: String, age: Int)  def
main(args: Array[String]): Unit = {    val ee =
StreamExecutionEnvironment.getExecutionEnvironment    val te =
StreamTableEnvironment.create(ee)    val personSchema =
Schema.newBuilder().column("name", DataTypes.STRING()).column("age",
DataTypes.INT()).build()    val x = ee.fromCollection(List(Person("a",
1)))    te.createTemporaryView(      "my_table",      x,
personSchema    )    val y = ee.fromCollection(List(Person("b", 2)))
 te.createTemporaryView(      "my_table_2",      y,      personSchema
  )    val res =      te.executeSql("""                      |WITH A
AS (                      |  select name, age + 1 from my_table
              |),                      |B AS (                      |
select name, age + 2 from my_table_2                      |)
           |                      |SELECT A.name, B.age
      |FROM A                      |JOIN B                      |FOR
SYSTEM_TIME AS OF PROCTIME() on (A.name = B.name)
|""".stripMargin)    res.print()  }
}


The following would produce a NullPointerException with error:

Caused by: java.lang.NullPointerException
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSnapshot(SqlValidatorImpl.java:4714)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:986)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
        at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
        at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
        at 
org.apache.calcite.sql.validate.WithNamespace.validateImpl(WithNamespace.java:57)
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWith(SqlValidatorImpl.java:3744)
        at org.apache.calcite.sql.SqlWith.validate(SqlWith.java:71)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)

I have opened an issue on this here:
https://issues.apache.org/jira/browse/FLINK-24956

Was wondering if anyone has any workaround in mind.


-- 
Best Regards,
Yuval Itzchakov.

Reply via email to