Yuval Itzchakov created FLINK-24956:
---------------------------------------

             Summary: SqlSnapshot throws NullPointerException when used in 
conjunction with CTE
                 Key: FLINK-24956
                 URL: https://issues.apache.org/jira/browse/FLINK-24956
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.13.3, 1.14.0
            Reporter: Yuval Itzchakov


Executing the following program will fail with a NullPointerException:

 
{code:java}
package foo.bar

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.Schema
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Test {
  final case class Person(name: String, age: Int)

  def main(args: Array[String]): Unit = {
    val ee = StreamExecutionEnvironment.getExecutionEnvironment
    val te = StreamTableEnvironment.create(ee)    
    val personSchema = Schema.newBuilder().column("name", 
DataTypes.STRING()).column("age", DataTypes.INT()).build()
    val x = ee.fromCollection(List(Person("a", 1)))
    te.createTemporaryView(
      "my_table",
      x,
      personSchema
    )
    val y = ee.fromCollection(List(Person("b", 2)))
    te.createTemporaryView(
      "my_table_2",
      y,
      personSchema
    )
    val res =
      te.executeSql("""
                      |WITH A AS (
                      |  select name, age + 1 from my_table
                      |),
                      |B AS (
                      |  select name, age + 2 from my_table_2
                      |)
                      |
                      |SELECT A.name, B.age
                      |FROM A
                      |JOIN B
                      |FOR SYSTEM_TIME AS OF PROCTIME() on (A.name = B.name)
                      |""".stripMargin)
    res.print()
  }
}
{code}
Stacktrace:
{code:java}
Caused by: java.lang.NullPointerException
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSnapshot(SqlValidatorImpl.java:4714)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:986)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
    at 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
    at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    at 
org.apache.calcite.sql.validate.WithNamespace.validateImpl(WithNamespace.java:57)
    at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWith(SqlValidatorImpl.java:3744)
    at org.apache.calcite.sql.SqlWith.validate(SqlWith.java:71)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
    at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
 {code}
The reason this fails is that SqlValidatorImpl, when validating the 
SqlSnapshot, always assumes it's operating on a node which has an underlying 
table directly:
{code:java}
            if (!ns.getTable().isTemporal()) {
                List<String> qualifiedName = ns.getTable().getQualifiedName();
                String tableName = qualifiedName.get(qualifiedName.size() - 1);
                throw newValidationError(
                        snapshot.getTableRef(), 
Static.RESOURCE.notTemporalTable(tableName));
            }
 {code}
This is not always the case, as with CTE. 

A simple fix for this would be first checking `ns.getTable` agains't null, and 
only then checking it's temporality.

 

The issue here is that this bug is inside Calcites validator. 

Would love some guidance on how to fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to