Hi Andres, In that case you should use `flatMap` method instead of `map` method. `flatMap` method allows you to return multiple elements and collect them all into one DS. This applies even if you have multiple contents in your DS<String>.
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dsString = env.fromElements("1,a,1.1|2,b,2.2,-2", "3,c|4,d,4.4"); DataStream<Object> dsTuple = dsString.flatMap(new FlatMapFunction<String, Object>() { @Override public void flatMap(String value, Collector<Object> out) throws Exception { for (String record : value.split("\\|")) { String[] split = record.split(","); if (split.length == 2) { out.collect(new Tuple2<>(Integer.valueOf(split[0]), split[1])); } else if (split.length == 3) { out.collect(new Tuple3<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]))); } else { out.collect(new Tuple4<>(Integer.valueOf(split[0]), split[1], Double.valueOf(split[2]), Long.valueOf(split[3]))); } } } }); dsTuple.print(); env.execute(); } Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午11:47写道: > Hello Weng, > > This definitely helps a lot, however I know my initial DS has a single > row content then I would in theory just create a DS which is what I need. > That is why I need to know how to create a new environment DS within a map > function. > > thanks so much > > On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi Andres, >> >> Thanks for the detailed explanation. >> >> but apparently I can't create a new DS within a map function >> >> >> If you create a new DS within the map function, then you'll create as >> many DSs as the number of elements in the old DS which... doesn't seem to >> be your desired situation? I suppose you want to create a DS<Tuple> from >> DS<String>. If that is the case you can write something like this: >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> DataStream<String> dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", >> "3,c", "4,d,4.4"); >> DataStream<Object> dsTuple = dsString.map(s -> { >> String[] split = s.split(","); >> if (split.length == 2) { >> return new Tuple2<>(Integer.valueOf(split[0]), split[1]); >> } else if (split.length == 3) { >> return new Tuple3<>(Integer.valueOf(split[0]), split[1], >> Double.valueOf(split[2])); >> } else { >> return new Tuple4<>(Integer.valueOf(split[0]), split[1], >> Double.valueOf(split[2]), Long.valueOf(split[3])); >> } >> }); >> >> dsTuple.print(); >> env.execute(); >> } >> >> >> How dynamically create the DS<Tuple> >> >> >> As you can see in the above code, I did not create a DS<Tuple> but a >> DS<Object>, because Tuple can't be directly used. It seems that you want to >> turn this new DS into a table, but if different records have different >> number of columns this is not a good practice as the schema of each record >> is not the same (but as a workaround, you can fill the columns with null if >> some record doesn't have this column). >> >> Hope this solves your problem. If you have any other problems feel free >> to write back. >> >> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:50写道: >> >>> Hello, >>> >>> Let me list properly the questions I have: >>> >>> * How to catch into a string the content of a DataStream? about this >>> point basically I have a DS<String> , the only way how I can use the >>> content is within a map function , print , store the content somewhere or >>> SQL queries. The point is that I need the content because depending on that >>> content I need to create another DS and later register it as a Table >>> environment, which means I need the value content but likewise the headers >>> content and the whole info is within the DS<String>. The first option I had >>> was play with the map function but apparently I can't create a new DS >>> within a map function and less register it as a new table environment. >>> >>> My second option in this point could be create a sort of public variable >>> to store the DS<String> content and then create my UDF, but sadly this is >>> neither allowed. My options in this case would be either somehow store >>> public the content of the DS<String> into a new variable, turn the >>> DS<String> as String or store the content in a file and read the file and >>> start over to parse the content to serve the header and content for the new >>> DS. >>> >>> * How dynamically create the DS<Tuple>: well basically after parse the >>> point above I might end up with an array of fields sometimes 4,3,2 doesnt >>> matter then I might need to swing between different tuples or turn my >>> content into Row to create a DS<Row>. >>> >>> I'm looking forward to reading your comments. >>> >>> thanks so much >>> >>> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <tsreape...@gmail.com> >>> wrote: >>> >>>> Hi Andres, >>>> >>>> Sorry I can't quite get your question... Do you mean that how to spilt >>>> the string into fields? >>>> >>>> There is a `split` method in java. You can give it a regexp and it will >>>> return an array containing all the split fields. >>>> >>>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:28写道: >>>> >>>>> Hello Weng, >>>>> >>>>> thanks for your reply, however I'm struggling to somehow read the >>>>> content of my DS with the payload that defines how many fields the message >>>>> contains into a String. That is the reason why I thought into a map >>>>> function for that DS. >>>>> >>>>> The Tuple part can change overtime can even pass from 3 or 4 to 2 then >>>>> it can change the whole time. How could I approach this challenge? >>>>> >>>>> thanks so much >>>>> >>>>> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <tsreape...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Andres, >>>>>> >>>>>> Are the payloads strings? If yes, one method is that you can store >>>>>> them as strings and process it further with user defined functions when >>>>>> you >>>>>> need to use them. >>>>>> >>>>>> Another method is that you can store them into arrays. >>>>>> >>>>>> Also, if the type of the first 3 fields are the same for the first >>>>>> and second payload, you can use a Tuple4<> and set the last element as >>>>>> null >>>>>> for the first payload. >>>>>> >>>>>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 >>>>>> 上午10:09写道: >>>>>> >>>>>>> Hello everyone, >>>>>>> >>>>>>> I need to create dynamically the size of my Tuple that feeds a DS, >>>>>>> let me explain it better. Let's assume the first payload I read has this >>>>>>> format "filed1,field2,field3", then this might require a Tuple3<> but my >>>>>>> payload later can be "field1,field2,field3,field4" then my Tuple might >>>>>>> need >>>>>>> to be refine it on the flight and now be Tuple4<>. >>>>>>> >>>>>>> How could I create this dynamically, any idea? >>>>>>> >>>>>>> Thanks so much >>>>>>> >>>>>>