Hi!

I've created a JIRA ticket[1] for this issue. Please check it out and track
the progress there.

[1] https://issues.apache.org/jira/browse/FLINK-23885

Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道:

> Hi!
>
> This is because TypeExtractor#getMapReturnTypes are not dealing with row
> types (see that method and also TypeExtractor#privateGetForClass). You
> might want to open a JIRA ticket for this.
>
> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>
>> Hey Flinkers,
>>
>> I am trying to follow the docs
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api>
>>  to
>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>> and want the columns of the row to become the columns of the resulting
>> table.
>>
>> That works but only if I construct the Rows statically. If I construct
>> them dynamically (in a map) then Flink turns the entire Row into one column
>> of type "RAW('org.apache.flink.types.Row', '...')".
>>
>> Does anybody know why this is the case or how to fix it? Take a look at
>> the simple Flink program below where I construct the DataStream "rows" in
>> two different ways. I would expect those to be identical (and the sink does
>> print identical information) but the inferred table schema is different.
>>
>> Thanks a ton,
>> Matthias
>>
>> ------------------------------
>>
>>         StreamExecutionEnvironment flinkEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>
>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>
>> //  This alternative way of constructing this data stream produces the 
>> expected table schema
>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), 
>> Row.of("Name5", 5));
>>
>>         StreamTableEnvironment tableEnv = 
>> StreamTableEnvironment.create(flinkEnv);
>>         Table table = tableEnv.fromDataStream(rows);
>>         table.printSchema();
>>
>>         rows.addSink(new PrintSinkFunction<>());
>>
>>         flinkEnv.execute();
>>
>>

Reply via email to