[ https://issues.apache.org/jira/browse/FLINK-18782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168996#comment-17168996 ]
Ying Z commented on FLINK-18782: -------------------------------- [~jark] Thanks for your advice ^_^ I have already tried the way too,and it worked. But when I face a new scene like this: {code:java} // code placeholder tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime) val t1 = tableEnv.sqlQuery( """ |SELECT source_table.*, b FROM source_table |, LATERAL TABLE(foo(a)) as T(b) |""".stripMargin ) /* t1 table schema: root |-- a: INT |-- b: INT */ println(s"t1 table schema: ${t1.getSchema}") val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType) {code} result in a new error msg: Exception in thread "main" org.apache.flink.table.api.TableException: The time indicator type is an internal type only. I can understand maybe this behaivor is undefined by design, but is there a way to set the field name manully?I tried this way but fail again {code:java} // code placeholder val t1Stream = t1.toAppendStream[Row] // t1 stream schema: Row(a: Integer, f0: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tableEnv.registerDataStream("t1", t1Stream, 'a, 'b) {code} result in error msg: Exception in thread "main" org.apache.flink.table.api.ValidationException: b is not a field of type Row(a: Integer, f0: Integer). Expected: a, f0} > Retain the column name when converting a Table to a DataStream > -------------------------------------------------------------- > > Key: FLINK-18782 > URL: https://issues.apache.org/jira/browse/FLINK-18782 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Affects Versions: 1.9.1 > Reporter: Ying Z > Priority: Major > > mail: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html] > > I met some field name errors when try to convert in Table and DataStream. > First, init a datastream and convert to table 'source', register a > tablefunction named 'foo' > {code:java} > val sourceStream = env.socketTextStream("127.0.0.1", 8010) > .map(line => line.toInt) > tableEnv.registerDataStream("source_table", sourceStream, 'a) > class Foo() extends TableFunction[(Int)] { > def eval(col: Int): Unit = collect((col * 10)) > } > tableEnv.registerFunction("foo", new Foo) > {code} > Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' > {code:java} > val t1 = tableEnv.sqlQuery( > """ > |SELECT source_table.a, b FROM source_table > |, LATERAL TABLE(foo(a)) as T(b) > |""".stripMargin > ) > /* > t1 table schema: root > |-- a: INT > |-- b: INT > */ > println(s"t1 table schema: ${t1.getSchema}") > {code} > When I try to convert 't1' to a datastream then register to a new table(for > some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b' > {code:java} > val t1Stream = t1.toAppendStream[Row] > // t1 stream schema: Row(a: Integer, f0: Integer) > println(s"t1 stream schema: ${t1Stream.getType()}") > tableEnv.registerDataStream("t1", t1Stream) > /* > new t1 table schema: root > |-- a: INT > |-- f0: INT > */ > println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}") > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)