[ 
https://issues.apache.org/jira/browse/FLINK-24956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17446935#comment-17446935
 ] 

Yuval Itzchakov commented on FLINK-24956:
-----------------------------------------

[~TsReaper] https://github.com/apache/flink/pull/17845

> SqlSnapshot throws NullPointerException when used in conjunction with CTE
> -------------------------------------------------------------------------
>
>                 Key: FLINK-24956
>                 URL: https://issues.apache.org/jira/browse/FLINK-24956
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.0, 1.13.3
>            Reporter: Yuval Itzchakov
>            Priority: Major
>
> Executing the following program will fail with a NullPointerException:
>  
> {code:java}
> package foo.bar
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.DataTypes
> import org.apache.flink.table.api.Schema
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> object Test {
>   final case class Person(name: String, age: Int)
>   def main(args: Array[String]): Unit = {
>     val ee = StreamExecutionEnvironment.getExecutionEnvironment
>     val te = StreamTableEnvironment.create(ee)    
>     val personSchema = Schema.newBuilder().column("name", 
> DataTypes.STRING()).column("age", DataTypes.INT()).build()
>     val x = ee.fromCollection(List(Person("a", 1)))
>     te.createTemporaryView(
>       "my_table",
>       x,
>       personSchema
>     )
>     val y = ee.fromCollection(List(Person("b", 2)))
>     te.createTemporaryView(
>       "my_table_2",
>       y,
>       personSchema
>     )
>     val res =
>       te.executeSql("""
>                       |WITH A AS (
>                       |  select name, age + 1 from my_table
>                       |),
>                       |B AS (
>                       |  select name, age + 2 from my_table_2
>                       |)
>                       |
>                       |SELECT A.name, B.age
>                       |FROM A
>                       |JOIN B
>                       |FOR SYSTEM_TIME AS OF PROCTIME() on (A.name = B.name)
>                       |""".stripMargin)
>     res.print()
>   }
> }
> {code}
> Stacktrace:
> {code:java}
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSnapshot(SqlValidatorImpl.java:4714)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:986)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3133)
>     at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:117)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3076)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
>     at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>     at 
> org.apache.calcite.sql.validate.WithNamespace.validateImpl(WithNamespace.java:57)
>     at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWith(SqlValidatorImpl.java:3744)
>     at org.apache.calcite.sql.SqlWith.validate(SqlWith.java:71)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
>  {code}
> The reason this fails is that SqlValidatorImpl, when validating the 
> SqlSnapshot, always assumes it's operating on a node which has an underlying 
> table directly:
> {code:java}
>             if (!ns.getTable().isTemporal()) {
>                 List<String> qualifiedName = ns.getTable().getQualifiedName();
>                 String tableName = qualifiedName.get(qualifiedName.size() - 
> 1);
>                 throw newValidationError(
>                         snapshot.getTableRef(), 
> Static.RESOURCE.notTemporalTable(tableName));
>             }
>  {code}
> This is not always the case, as with CTE. 
> A simple fix for this would be first checking `ns.getTable` agains't null, 
> and only then checking it's temporality.
>  
> The issue here is that this bug is inside Calcites validator. 
> Would love some guidance on how to fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to