Hi, Community:
  I met some field name errors when try to convert in Table and DataStream.
  flink version: 1.9.1


First, init a datastream and convert to table 'source', register a 
tablefunction named 'foo'
val sourceStream = env.socketTextStream("127.0.0.1", 8010)
  .map(line => line.toInt)
tableEnv.registerDataStream("source_table", sourceStream, 'a)

class Foo() extends TableFunction[(Int)] {
def eval(col: Int): Unit = collect((col * 10))

}
tableEnv.registerFunction("foo", new Foo)
Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
val t1 = tableEnv.sqlQuery(
"""
    |SELECT source_table.a, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
When I try to convert 't1' to a datastream then register to a new table(for 
some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
I can find 'f0' only with the Java-API in Refs-1.
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream)
/*
new t1 table schema: root
|-- a: INT
|-- f0: INT
 */
println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
Consider maybe the default TypeExtractor(?) works not very well here, Then I 
try to set the field name explicitly, but failed too.
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
If I add a proctime at first, this works well, but I do not want to set a 
proctime which is unused.
tableEnv.registerDataStream("source_table", sourceStream, 'a, 
'proctime.proctime)


And my question is :
1. why the behavior of the code above seems a little strange?
2. How to retain the 'b' when convert with table and stream frequently?


Refs:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table


Thanks for ur reply.

Reply via email to