Ying Z created FLINK-18782: ------------------------------ Summary: How to retain the column'name when convert a Table to DataStream Key: FLINK-18782 URL: https://issues.apache.org/jira/browse/FLINK-18782 Project: Flink Issue Type: Bug Affects Versions: 1.9.1 Reporter: Ying Z
mail: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html] I met some field name errors when try to convert in Table and DataStream. First, init a datastream and convert to table 'source', register a tablefunction named 'foo' {code:java} val sourceStream = env.socketTextStream("127.0.0.1", 8010) .map(line => line.toInt) tableEnv.registerDataStream("source_table", sourceStream, 'a) class Foo() extends TableFunction[(Int)] { def eval(col: Int): Unit = collect((col * 10)) } tableEnv.registerFunction("foo", new Foo) {code} Then, use sqlQuery to generate a new table t1 with columns 'a' 'b' {code:java} val t1 = tableEnv.sqlQuery( """ |SELECT source_table.a, b FROM source_table |, LATERAL TABLE(foo(a)) as T(b) |""".stripMargin ) /* t1 table schema: root |-- a: INT |-- b: INT */ println(s"t1 table schema: ${t1.getSchema}") {code} When I try to convert 't1' to a datastream then register to a new table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b' {code:java} val t1Stream = t1.toAppendStream[Row] // t1 stream schema: Row(a: Integer, f0: Integer) println(s"t1 stream schema: ${t1Stream.getType()}") tableEnv.registerDataStream("t1", t1Stream) /* new t1 table schema: root |-- a: INT |-- f0: INT */ println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}") {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)