Thank you, Caizhi, for looking into this and identifying the source of the bug. Is there a way to work around this at the API level until this bug is resolved? Can I somehow "inject" the type?
Thanks a lot for your help, Matthias On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > I've created a JIRA ticket[1] for this issue. Please check it out and > track the progress there. > > [1] https://issues.apache.org/jira/browse/FLINK-23885 > > Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道: > >> Hi! >> >> This is because TypeExtractor#getMapReturnTypes are not dealing with row >> types (see that method and also TypeExtractor#privateGetForClass). You >> might want to open a JIRA ticket for this. >> >> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道: >> >>> Hey Flinkers, >>> >>> I am trying to follow the docs >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> >>> to >>> convert a DataStream to a Table. Specifically, I have a DataStream of Row >>> and want the columns of the row to become the columns of the resulting >>> table. >>> >>> That works but only if I construct the Rows statically. If I construct >>> them dynamically (in a map) then Flink turns the entire Row into one column >>> of type "RAW('org.apache.flink.types.Row', '...')". >>> >>> Does anybody know why this is the case or how to fix it? Take a look at >>> the simple Flink program below where I construct the DataStream "rows" in >>> two different ways. I would expect those to be identical (and the sink does >>> print identical information) but the inferred table schema is different. >>> >>> Thanks a ton, >>> Matthias >>> >>> ------------------------------ >>> >>> StreamExecutionEnvironment flinkEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>> >>> DataStream<Integer> integers = flinkEnv.fromElements(12, 5); >>> >>> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); >>> >>> // This alternative way of constructing this data stream produces the >>> expected table schema >>> // DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), >>> Row.of("Name5", 5)); >>> >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(flinkEnv); >>> Table table = tableEnv.fromDataStream(rows); >>> table.printSchema(); >>> >>> rows.addSink(new PrintSinkFunction<>()); >>> >>> flinkEnv.execute(); >>> >>>