[ 
https://issues.apache.org/jira/browse/FLINK-18750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17167046#comment-17167046
 ] 

JinxinTang commented on FLINK-18750:
------------------------------------

Seems interesting, nice reproduce code, I will check this too.

> SqlValidatorException thrown when select from a view which contains a UDTF 
> call
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-18750
>                 URL: https://issues.apache.org/jira/browse/FLINK-18750
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.11.1
>            Reporter: Wei Zhong
>            Priority: Major
>
> When executing such code:
>  
> {code:java}
> package com.example;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TableFunction;
> public class TestUTDF {
>    public static class UDTF extends TableFunction<String> {
>       public void eval(String input) {
>          collect(input);
>       }
>    }
>    public static void main(String[] args) {
>       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       StreamTableEnvironment tEnv = StreamTableEnvironment.create(
>          env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
>       tEnv.createTemporarySystemFunction("udtf", new UDTF());
>       tEnv.createTemporaryView("source", tEnv.fromValues("a", "b", 
> "c").as("f0"));
>       String udtfCall = "SELECT S.f0, T.f1 FROM source as S, LATERAL 
> TABLE(udtf(f0)) as T(f1)";
>       System.out.println(tEnv.explainSql(udtfCall));
>       String createViewCall = "CREATE VIEW tmp_view AS" + udtfCall;
>       tEnv.executeSql(createViewCall);
>       System.out.println(tEnv.from("tmp_view").explain());
>    }
> }
> {code}
> Such a SqlValidatorException would be thrown:
>  
>  
> {code:java}
> == Abstract Syntax Tree ==== Abstract Syntax Tree ==LogicalProject(f0=[$0], 
> f1=[$1])+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
> requiredColumns=[{0}])   :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')])   :  
> +- LogicalValues(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' 
> }]])   +- LogicalTableFunctionScan(invocation=[udtf($cor0.f0)], 
> rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
> == Optimized Logical Plan ==Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER])+- Calc(select=[f0])   +- Values(type=[RecordType(CHAR(1) 
> f0)], tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
> == Physical Execution Plan ==Stage 1 : Data Source content : Source: 
> Values(tuples=[[{ _UTF-16LE'a' }, { _UTF-16LE'b' }, { _UTF-16LE'c' }]])
>  Stage 2 : Operator content : Calc(select=[f0]) ship_strategy : FORWARD
>  Stage 3 : Operator content : Correlate(invocation=[udtf($cor0.f0)], 
> correlate=[table(udtf($cor0.f0))], select=[f0,EXPR$0], 
> rowType=[RecordType(CHAR(1) f0, VARCHAR(2147483647) EXPR$0)], 
> joinType=[INNER]) ship_strategy : FORWARDException in thread "main" 
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 4, column 14 to line 4, column 17: Column 'f0' not found in any table at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl$ToRelContextImpl.expandView(FlinkPlannerImpl.scala:204)
>  at org.apache.calcite.plan.ViewExpanders$1.expandView(ViewExpanders.java:52) 
> at 
> org.apache.flink.table.planner.catalog.SqlCatalogViewTable.convertToRel(SqlCatalogViewTable.java:58)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.expand(ExpandingPreparingTable.java:59)
>  at 
> org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable.toRel(ExpandingPreparingTable.java:55)
>  at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:533)
>  at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1044) at 
> org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068) at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:349)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:152)
>  at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:149)
>  at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
>  at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
>  at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>  at 
> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:165)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:82)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:80)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:80)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:43)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:654)
>  at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:575) 
> at com.example.TestUTDF.main(TestUTDF.java:39)Caused by: 
> org.apache.calcite.runtime.CalciteContextException: From line 4, column 14 to 
> line 4, column 17: Column 'f0' not found in any table at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) 
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
>  at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
>  at 
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:609) 
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:236) at 
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>  at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>  at 
> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303)
>  at 
> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
>  at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
>  ... 31 moreCaused by: org.apache.calcite.sql.validate.SqlValidatorException: 
> Column 'f0' not found in any table at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) 
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) ... 72 
> more{code}
>  
>  After some debugging I think it is because the keyword "lateral" was 
> discarded after executing the create view operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to