liuhong shared an issue with you ---------------------------------
> 动态查询sql后,使用toAppendStream将动态表转化为流时报错,org.apache.flink.table.api.TableException > ------------------------------------------------------------------------------ > > Key: FLINK-23603 > URL: https://issues.apache.org/jira/browse/FLINK-23603 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.13.1 > Environment: > {code:java} > pom.xml > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.13.1</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.12</artifactId> > <version>1.13.1</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.12</artifactId> > <version>1.13.1</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner-blink_2.12</artifactId> > <version>1.13.1</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_2.12</artifactId> > <version>1.13.1</version> > <scope>provided</scope> > </dependency> > {code} > {code:java} > import com.atguigu.chapter05.bean.Water1; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import static org.apache.flink.table.api.Expressions.$; > public class Flink08_Time_ProcessingTime_DDL { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > tEnv.executeSql("create table sensor(" + > "id string," + > "ts bigint," + > "vc int" + > //"pt as proctime()" + > ") with (" + > " 'connector' = 'filesystem' ," + > " 'path' = 'input/water.txt' ," + > " 'format' = 'csv' " + > ")"); > //tEnv.sqlQuery("select * from sensor").execute().print(); > //Table t1 = tEnv.sqlQuery("select id,ts,vc hight from sensor"); > Table t1 = tEnv.from("sensor"); > Table t2 = t1.select($("id"), $("ts"),$("vc").as("height")); > /*t2.execute().print(); > t2.printSchema();*/ > tEnv.toAppendStream(t2, Water1.class).print(); > env.execute(); > } > } > {code} > {code:java} > import lombok.AllArgsConstructor; > import lombok.Data; > import lombok.NoArgsConstructor; > @Data > @NoArgsConstructor > @AllArgsConstructor > public class Water1 { > private String id; > private Long ts; > private Integer height; > } > {code} > {panel:title=water.txt} > sensor_1,1,1 > sensor_1,2,2 > sensor_2,3,45 > sensor_1,4,4 > sensor_2,6,9 > sensor_1,7,6 > sensor_3,8,7 > {panel} > > > > Reporter: liuhong > Priority: Major > > 当执行环境中Flink08_Time_ProcessingTime_DDL.main时会抛出以下异常,如果在Flink08_Time_ProcessingTime_DDL中修改 > Table t2 = t1.select($("id"), > $("ts"),{color:#de350b}$("vc").as("height")){color};为 > Table t2 = t1.select($("id"),{color:#de350b}$("vc").as("height"){color}, > $("ts"));则正常输出结果 > Exception in thread "main" org.apache.flink.table.api.TableException: height > is not found in id, ts, vcException in thread "main" > org.apache.flink.table.api.TableException: height is not found in id, ts, vc > at > org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1(SinkCodeGenerator.scala:83) > at > org.apache.flink.table.planner.codegen.SinkCodeGenerator$.$anonfun$generateRowConverterOperator$1$adapted(SinkCodeGenerator.scala:79) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:194) at > scala.collection.TraversableLike.map(TraversableLike.scala:233) at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) at > scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:194) at > org.apache.flink.table.planner.codegen.SinkCodeGenerator$.generateRowConverterOperator(SinkCodeGenerator.scala:79) > at > org.apache.flink.table.planner.codegen.SinkCodeGenerator.generateRowConverterOperator(SinkCodeGenerator.scala) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:190) > at > org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:141) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) > at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:70) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at > scala.collection.Iterator.foreach(Iterator.scala:937) at > scala.collection.Iterator.foreach$(Iterator.scala:937) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at > scala.collection.IterableLike.foreach(IterableLike.scala:70) at > scala.collection.IterableLike.foreach$(IterableLike.scala:69) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike.map(TraversableLike.scala:233) at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:439) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:511) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:501) > at > com.atguigu.chapter11.Flink08_Time_ProcessingTime_DDL.main(Flink08_Time_ProcessingTime_DDL.java:36) -- This message was sent by Atlassian Jira (v8.3.4#803005)