Perfect, that worked. Thanks a lot, JING!
On Sun, Aug 22, 2021 at 1:25 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Matthias, > Before the bug is fixed, you could specify the return type explicitly in > the second parameter of the map function. > > DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); -> > > DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i), new > RowTypeInfo(Types.STRING, Types.INT)); > > Best, > JING ZHANG > > > > Matthias Broecheler <matth...@dataeng.ai> 于2021年8月21日周六 上午12:40写道: > >> Thank you, Caizhi, for looking into this and identifying the source of >> the bug. Is there a way to work around this at the API level until this bug >> is resolved? Can I somehow "inject" the type? >> >> Thanks a lot for your help, >> Matthias >> >> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <tsreape...@gmail.com> >> wrote: >> >>> Hi! >>> >>> I've created a JIRA ticket[1] for this issue. Please check it out and >>> track the progress there. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-23885 >>> >>> Caizhi Weng <tsreape...@gmail.com> 于2021年8月20日周五 上午10:47写道: >>> >>>> Hi! >>>> >>>> This is because TypeExtractor#getMapReturnTypes are not dealing with >>>> row types (see that method and also TypeExtractor#privateGetForClass). You >>>> might want to open a JIRA ticket for this. >>>> >>>> Matthias Broecheler <matth...@dataeng.ai> 于2021年8月20日周五 上午7:01写道: >>>> >>>>> Hey Flinkers, >>>>> >>>>> I am trying to follow the docs >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> >>>>> to >>>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row >>>>> and want the columns of the row to become the columns of the resulting >>>>> table. >>>>> >>>>> That works but only if I construct the Rows statically. If I construct >>>>> them dynamically (in a map) then Flink turns the entire Row into one >>>>> column >>>>> of type "RAW('org.apache.flink.types.Row', '...')". >>>>> >>>>> Does anybody know why this is the case or how to fix it? Take a look >>>>> at the simple Flink program below where I construct the DataStream "rows" >>>>> in two different ways. I would expect those to be identical (and the sink >>>>> does print identical information) but the inferred table schema is >>>>> different. >>>>> >>>>> Thanks a ton, >>>>> Matthias >>>>> >>>>> ------------------------------ >>>>> >>>>> StreamExecutionEnvironment flinkEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); >>>>> >>>>> DataStream<Integer> integers = flinkEnv.fromElements(12, 5); >>>>> >>>>> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i)); >>>>> >>>>> // This alternative way of constructing this data stream produces the >>>>> expected table schema >>>>> // DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", >>>>> 12), Row.of("Name5", 5)); >>>>> >>>>> StreamTableEnvironment tableEnv = >>>>> StreamTableEnvironment.create(flinkEnv); >>>>> Table table = tableEnv.fromDataStream(rows); >>>>> table.printSchema(); >>>>> >>>>> rows.addSink(new PrintSinkFunction<>()); >>>>> >>>>> flinkEnv.execute(); >>>>> >>>>>