[ 
https://issues.apache.org/jira/browse/FLINK-18782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168996#comment-17168996
 ] 

Ying Z commented on FLINK-18782:
--------------------------------

[~jark] Thanks for your advice ^_^

I have already tried the way too,and it worked.

But when I face a new scene like this:
{code:java}
// code placeholder
tableEnv.registerDataStream("source_table", sourceStream, 'a, 
'proctime.proctime)

val t1 = tableEnv.sqlQuery(
  """
    |SELECT source_table.*, b FROM source_table
    |, LATERAL TABLE(foo(a)) as T(b)
    |""".stripMargin
)
/*
 t1 table schema: root
 |-- a: INT
 |-- b: INT
 */
println(s"t1 table schema: ${t1.getSchema}")
val t1Stream = t1.toAppendStream[Row](t1.getSchema.toRowType)
{code}
result in a new error msg:

Exception in thread "main" org.apache.flink.table.api.TableException: The time 
indicator type is an internal type only.

 

I can understand maybe  this behaivor is undefined by design, but is there a 
way to set the field name manully?I tried this way but fail again
{code:java}
// code placeholder
val t1Stream = t1.toAppendStream[Row]
// t1 stream schema: Row(a: Integer, f0: Integer)
println(s"t1 stream schema: ${t1Stream.getType()}")
tableEnv.registerDataStream("t1", t1Stream, 'a, 'b)
{code}
result in error msg:

Exception in thread "main" org.apache.flink.table.api.ValidationException: b is 
not a field of type Row(a: Integer, f0: Integer). Expected: a, f0}

 

> Retain the column name when converting a Table to a DataStream
> --------------------------------------------------------------
>
>                 Key: FLINK-18782
>                 URL: https://issues.apache.org/jira/browse/FLINK-18782
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.1
>            Reporter: Ying Z
>            Priority: Major
>
> mail: 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-retain-the-column-name-when-convert-a-Table-to-DataStream-td37002.html]
>  
> I met some field name errors when try to convert in Table and DataStream.
>  First, init a datastream and convert to table 'source', register a 
> tablefunction named 'foo'
> {code:java}
> val sourceStream = env.socketTextStream("127.0.0.1", 8010)
>   .map(line => line.toInt)
> tableEnv.registerDataStream("source_table", sourceStream, 'a)
> class Foo() extends TableFunction[(Int)] {
>   def eval(col: Int): Unit = collect((col * 10))
> }
> tableEnv.registerFunction("foo", new Foo)
> {code}
> Then, use sqlQuery to generate a new table t1 with columns 'a' 'b'
> {code:java}
> val t1 = tableEnv.sqlQuery(
>   """
>     |SELECT source_table.a, b FROM source_table
>     |, LATERAL TABLE(foo(a)) as T(b)
>     |""".stripMargin
> )
> /*
>  t1 table schema: root
>  |-- a: INT
>  |-- b: INT
>  */
> println(s"t1 table schema: ${t1.getSchema}")
> {code}
> When I try to convert 't1' to a datastream then register to a new table(for 
> some reason) named 't1', the columns changes to 'a' 'f0', not 'a' 'b'
> {code:java}
> val t1Stream = t1.toAppendStream[Row]
> // t1 stream schema: Row(a: Integer, f0: Integer)
> println(s"t1 stream schema: ${t1Stream.getType()}")
> tableEnv.registerDataStream("t1", t1Stream)
> /*
> new t1 table schema: root
> |-- a: INT
> |-- f0: INT
>  */
> println(s"new t1 table schema: ${tableEnv.scan("t1").getSchema}")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to