Hi, Community: I met some field name errors when try to convert in Table and DataStream. flink version: 1.9.1
First, init a datastream and convert to table 'source', register a tablefunction named 'foo' val sourceStream = env.socketTextStream("127.0.0.1", 8010) .map(line => line.toInt) tableEnv.registerDataStream("source_table", sourceStream, 'a) class Foo() extends TableFunction[(Int)] { def eval(col: Int): Unit = collect((col * 10)) } tableEnv.registerFunction("foo", new Foo) Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' val t1 = tableEnv.sqlQuery( """ |SELECT source_table.a, b FROM source_table |, LATERAL TABLE(foo(a)) as T(b) |""".stripMargin ) /* t1 table schema: root |-- a: INT |-- b: INT */ println(s"t1 table schema: ${t1.getSchema}") When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b' I can find 'f0' only with the Java-API in Refs-1. val t1Stream = t1.toAppendStream[Row] // t1 stream schema: Row(a: Integer, f0: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tableEnv.registerDataStream("t1", t1Stream) /* new t1 table schema: root |-- a: INT |-- f0: INT */ println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}") Consider maybe the default TypeExtractor(?) works not very well here, Then I try to set the field name explicitly, but failed too. tableEnv.registerDataStream("t1", t1Stream, 'a, 'b) If I add a proctime at first, this works well, but I do not want to set a proctime which is unused. tableEnv.registerDataStream("source_table", sourceStream, 'a, 'proctime.proctime) And my question is : 1. why the behavior of the code above seems a little strange? 2. How to retain the 'b' when convert with table and stream frequently? Refs: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table Thanks for ur reply.