Hi,

For now, you can explicitly set the RowTypeInfo to retain the field names.
This works in master branch:

*    val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)*
    // t1 stream schema: Row(a: Integer, b: Integer)
    println(s"t1 stream schema: ${t1Stream.getType()}")
    tEnv.registerDataStream("t1", t1Stream)
    /*
    new t1 table schema: root
    |-- a: INT
    |-- b: INT
     */
    println(s"new t1 table schema: ${tEnv.scan("t1").getSchema}")


Best,
Jark

On Fri, 31 Jul 2020 at 18:03, izual <izual...@163.com> wrote:

> I create a JIRA issue here,
> https://issues.apache.org/jira/browse/FLINK-18782
> And thanks for your advice to avoid 「top-level projection/rename」^_^
>
>
>
>
> At 2020-07-30 16:58:45, "Dawid Wysakowicz" <dwysakow...@apache.org> wrote:
>
> Hi,
>
> I am afraid you are facing an issue that was not checked for/was not
> considered. I think your use case is absolutely valid and should be
> supported.
>
> The problem you are facing as far as I can tell from an initial
> investigation is that the top-level projection/rename is not being applied.
> Internally the foo(a) is passed around as an unnamed expression and should
> be aliased at the top level. This happens when simply querying therefore
> you get expected results in the first case when only printing the schema of
> a Table.
>
> When translating to the datastream this final rename does not take place,
> which imo is a bug. You can see this behaviour if you add additional
> projection. Then the renaming of the expression from lateral table happens
> a level deeper and is not stripped.
>
>     val t1 = tableEnv.sqlQuery(
>       """
>         |SELECT 1, * FROM (
>         |SELECT source_table.a, b FROM source_table
>         |, LATERAL TABLE(foo(a)) as T(b))
>         |""".stripMargin
>
>
>     t1 stream schema: Row(EXPR$0: Integer, a: Integer, b: Integer)
>     new t1 table schema: root
>      |-- EXPR$0: INT
>      |-- a: INT
>      |-- b: INT
>
>
> Do you mind creating a JIRA issue to fix/support this case?
>
> Unfortunately I can not think of a really good way how you could retain
> the column names. :(
>
> Best,
>
> Dawid
> On 28/07/2020 10:26, izual wrote:
>
> Hi, Community:
>   I met some field name errors when try to convert in Table and DataStream.
>   flink version: 1.9.1
>
> First, init a datastream and convert to table 'source', register a
> tablefunction named 'foo'
>
> val sourceStream = env.socketTextStream("127.0.0.1", 8010)
>   .map(line => line.toInt)tableEnv.registerDataStream("source_table", 
> sourceStream, 'a)
> class Foo() extends TableFunction[(Int)] {
>   def eval(col: Int): Unit = collect((col * 10))
> }
> tableEnv.registerFunction("foo", new Foo)
>
> Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
>
> val t1 = tableEnv.sqlQuery(
>   """    |SELECT source_table.a, b FROM source_table    |, LATERAL 
> TABLE(foo(a)) as T(b)    |""".stripMargin
> )/* t1 table schema: root |-- a: INT |-- b: INT */println(s"t1 table schema: 
> ${t1.getSchema}")
>
> When I try to convert 't1' to a datastream then register to a new
> table(for some reason) named 't1', the columns changes to 'a' 'f0', not 'a'
> 'b'
> I can find 'f0' only with the Java-API in Refs-1.
>
> val t1Stream = t1.toAppendStream[Row]// t1 stream schema: Row(a: Integer, f0: 
> Integer)println(s"t1 stream schema: 
> ${t1Stream.getType()}")tableEnv.registerDataStream("t1", t1Stream)/*new t1 
> table schema: root|-- a: INT|-- f0: INT */println(s"new t1 table schema: 
> ${tableEnv.scan("t1").getSchema}")
>
> Consider maybe the default TypeExtractor(?) works not very well here, Then
> I try to set the field name explicitly, but failed too.
>
> tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
>
> If I add a proctime at first, this works well, but I do not want to set a
> proctime which is unused.
>
> tableEnv.registerDataStream("source_table", sourceStream, 'a, 
> 'proctime.proctime)
>
>
> And my question is :
> 1. why the behavior of the code above seems a little strange?
> 2. How to retain the 'b' when convert with table and stream frequently?
>
> Refs:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#register-a-datastream-or-dataset-as-table
>
> Thanks for ur reply.
>
>
>
>
>
>
>
>

Reply via email to