Hi, For now, you can explicitly set the RowTypeInfo to retain the field names. This works in master branch:
* val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)* // t1 stream schema: Row(a: Integer, b: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tEnv.registerDataStream("t1", t1Stream) /* new t1 table schema: root |-- a: INT |-- b: INT */ println(s"new t1 table schema: ${tEnv.scan("t1").getSchema}") Best, Jark On Fri, 31 Jul 2020 at 18:03, izual <izual...@163.com> wrote: > I create a JIRA issue here, > https://issues.apache.org/jira/browse/FLINK-18782 > And thanks for your advice to avoid 「top-level projection/rename」^_^ > > > > > At 2020-07-30 16:58:45, "Dawid Wysakowicz" <dwysakow...@apache.org> wrote: > > Hi, > > I am afraid you are facing an issue that was not checked for/was not > considered. I think your use case is absolutely valid and should be > supported. > > The problem you are facing as far as I can tell from an initial > investigation is that the top-level projection/rename is not being applied. > Internally the foo(a) is passed around as an unnamed expression and should > be aliased at the top level. This happens when simply querying therefore > you get expected results in the first case when only printing the schema of > a Table. > > When translating to the datastream this final rename does not take place, > which imo is a bug. You can see this behaviour if you add additional > projection. Then the renaming of the expression from lateral table happens > a level deeper and is not stripped. > > val t1 = tableEnv.sqlQuery( > """ > |SELECT 1, * FROM ( > |SELECT source_table.a, b FROM source_table > |, LATERAL TABLE(foo(a)) as T(b)) > |""".stripMargin > > > t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer) > new t1 table schema: root > |-- EXPR$0: INT > |-- a: INT > |-- b: INT > > > Do you mind creating a JIRA issue to fix/support this case? > > Unfortunately I can not think of a really good way how you could retain > the column names. :( > > Best, > > Dawid > On 28/07/2020 10:26, izual wrote: > > Hi, Community: > I met some field name errors when try to convert in Table and DataStream. > flink version: 1.9.1 > > First, init a datastream and convert to table 'source', register a > tablefunction named 'foo' > > val sourceStream = env.socketTextStream("127.0.0.1", 8010) > .map(line => line.toInt)tableEnv.registerDataStream("source_table", > sourceStream, 'a) > class Foo() extends TableFunction[(Int)] { > def eval(col: Int): Unit = collect((col * 10)) > } > tableEnv.registerFunction("foo", new Foo) > > Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' > > val t1 = tableEnv.sqlQuery( > """ |SELECT source_table.a, b FROM source_table |, LATERAL > TABLE(foo(a)) as T(b) |""".stripMargin > )/* t1 table schema: root |-- a: INT |-- b: INT */println(s"t1 table schema: > ${t1.getSchema}") > > When I try to convert 't1' to a datastream then register to a new > table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' > 'b' > I can find 'f0' only with the Java-API in Refs-1. > > val t1Stream = t1.toAppendStream[Row]// t1 stream schema: Row(a: Integer, f0: > Integer)println(s"t1 stream schema: > ${t1Stream.getType()}")tableEnv.registerDataStream("t1", t1Stream)/*new t1 > table schema: root|-- a: INT|-- f0: INT */println(s"new t1 table schema: > ${tableEnv.scan("t1").getSchema}") > > Consider maybe the default TypeExtractor(?) works not very well here, Then > I try to set the field name explicitly, but failed too. > > tableEnv.registerDataStream("t1", t1Stream, 'a, 'b) > > If I add a proctime at first, this works well, but I do not want to set a > proctime which is unused. > > tableEnv.registerDataStream("source_table", sourceStream, 'a, > 'proctime.proctime) > > > And my question is : > 1. why the behavior of the code above seems a little strange? > 2. How to retain the 'b' when convert with table and stream frequently? > > Refs: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table > > Thanks for ur reply. > > > > > > > >