Hi! I've created a JIRA ticket[1] for this issue. Please check it out and track the progress there.
[1] https://issues.apache.org/jira/browse/FLINK-23885 Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道: > Hi! > > This is because TypeExtractor#getMapReturnTypes are not dealing with row > types (see that method and also TypeExtractor#privateGetForClass). You > might want to open a JIRA ticket for this. > > Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道: > >> Hey Flinkers, >> >> I am trying to follow the docs >> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> >> to >> convert a DataStream to a Table. Specifically, I have a DataStream of Row >> and want the columns of the row to become the columns of the resulting >> table. >> >> That works but only if I construct the Rows statically. If I construct >> them dynamically (in a map) then Flink turns the entire Row into one column >> of type "RAW('org.apache.flink.types.Row', '...')". >> >> Does anybody know why this is the case or how to fix it? Take a look at >> the simple Flink program below where I construct the DataStream "rows" in >> two different ways. I would expect those to be identical (and the sink does >> print identical information) but the inferred table schema is different. >> >> Thanks a ton, >> Matthias >> >> ------------------------------ >> >> StreamExecutionEnvironment flinkEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); >> >> DataStream<Integer> integers = flinkEnv.fromElements(12, 5); >> >> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); >> >> // This alternative way of constructing this data stream produces the >> expected table schema >> // DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), >> Row.of("Name5", 5)); >> >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(flinkEnv); >> Table table = tableEnv.fromDataStream(rows); >> table.printSchema(); >> >> rows.addSink(new PrintSinkFunction<>()); >> >> flinkEnv.execute(); >> >>