I forget to add the user mailing list in the response. I now add user mailing list to the response in case other users might want to solve this problem too...
Soheil Pourbafrani <soheil.i...@gmail.com> 于2019年7月15日周一 上午2:56写道: > Great! > I got it > > Thanks > > On Sun, Jul 14, 2019 at 8:26 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi Soheil, >> >> From what I understand, as `Row` is not strongly typed, and Flink >> currently does not specially treat the `RowTypeInfo` when extracting return >> types of map functions, one can't simply provide a map function to >> `dataset.map()` and expect it to return a strongly typed `RowTypeInfo`. >> >> But if your mapping function implements the `ResultTypeQueryable` >> interface, Flink can directly get the result type from its >> `getProducedType` method (see `getUnaryOperatorReturnType` in >> `TypeExtractor`). So you can write the following code to hint Flink about >> the return type of your mapping function. >> >> @Test >> public void myTest() { >> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); >> BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, >> config()); >> >> DataSet<Row> dataSet = env.fromElements( >> Row.of(1, 1L, "a"), >> Row.of(2, 2L, "b"), >> Row.of(3, 3L, "c") >> ); >> System.out.println(dataSet.getType()); >> >> DataSet<Row> ds = dataSet.map(row -> Row.of(row.getField(0), >> row.getField(1))); >> System.out.println(ds.getType()); >> >> DataSet<Row> ds2 = dataSet.map(new MyMappingFunc(dataSet)); >> System.out.println(ds2.getType()); >> } >> >> private class MyMappingFunc implements MapFunction<Row, Row>, >> ResultTypeQueryable<Row> { >> >> private RowTypeInfo producedType; >> >> public MyMappingFunc(DataSet<Row> dataSet) { >> RowTypeInfo ti = (RowTypeInfo) dataSet.getType(); >> producedType = new RowTypeInfo(ti.getTypeAt(0), ti.getTypeAt(1)); >> } >> >> @Override >> public TypeInformation<Row> getProducedType() { >> return producedType; >> } >> >> @Override >> public Row map(Row value) throws Exception { >> return Row.of(value.getField(0), value.getField(1)); >> } >> } >> >> You can see that, the type of `ds2` will be the desired Row(f0: Integer, >> f1: Long). >> >> This is the method what I can currently think of. Maybe there exists a >> simpler method, I'll ask the other Flink developers tomorrow. Maybe the >> developers would like to specially treat the `RowTypeInfo` when deriving >> return types of the mapping functions in the future. >> >> Soheil Pourbafrani <soheil.i...@gmail.com> 于2019年7月14日周日 下午8:22写道: >> >>> Hi Caizhi, >>> thanks for your reply. >>> >>> Maybe I didn't elaborate enough. I'm familiar with that method, maybe. >>> My problem is supposed you have a dataset with the schema (id: Integer, >>> name: String, register_date: Date). If I try to map it like the following: >>> >>> DataSet<Row> res = dataset.map(row -> { >>> Row temp = Row.of( >>> row.getField(0), >>> row.getField(1) >>> ); >>> >>> return temp; >>> }); >>> >>> then the res type info will be : >>> >>> System.out.println(res.getType()); >>> >>> GenericType<org.apache.flink.types.Row> >>> >>> But I want the typeinfo to show >>> Row(id: Integer, name: String) >>> >>> Is there any way to use the schema of the dataset in map operation to >>> generate a new dataset? >>> >>> On Sun, Jul 14, 2019 at 6:44 AM Caizhi Weng <tsreape...@gmail.com> >>> wrote: >>> >>>> Hi Soheil, >>>> >>>> There is the `toDataSet(Table, TypeInformation<?>)` method in >>>> `BatchTableEnvironment` to which you can pass your optional RowTypeInfo. >>>> Note that the TypeInformation you pass to it must match the field types of >>>> the table. >>>> >>>> Soheil Pourbafrani <soheil.i...@gmail.com> 于2019年7月14日周日 上午5:03写道: >>>> >>>>> Hi >>>>> >>>>> Creating a new DataSet of type Row, how can I the RowTypeInfo of the >>>>> row? >>>>> >>>>> For example when I create a new dataset like the following: >>>>> >>>>> Row row = Row.of(1, new Timestamp(1), new Date(1)); >>>>> System.out.println(env.fromElements(row).getType()); >>>>> >>>>> it results in: >>>>> Row(f0: Integer, f1: Timestamp, f2: Date) >>>>> While Flink automatically use f0, f1, ... for labeling I want to >>>>> specify the label name of each element >>>>> >>>>> For example, when I create a new DataSet from a Table, it contains the >>>>> Table schema too: >>>>> >>>>> DataSet res = tEnv.toDataSet(table, Row.class); >>>>> >>>>> Row(id: Integer, time: Timestamp, name: String, age: Integer, grade: >>>>> Double) >>>>> >>>>> So is there any way to create a new DataSet<Row> with optional >>>>> RowTypeInfo? >>>>> >>>>