Hi Andres,

In that case you should use `flatMap` method instead of `map` method.
`flatMap` method allows you to return multiple elements and collect them
all into one DS. This applies even if you have multiple contents in your
DS<String>.

public static void main(String[] args) throws Exception {
   StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

   DataStream<String> dsString =
env.fromElements("1,a,1.1|2,b,2.2,-2", "3,c|4,d,4.4");
   DataStream<Object> dsTuple = dsString.flatMap(new
FlatMapFunction<String, Object>() {
      @Override
      public void flatMap(String value, Collector<Object> out) throws
Exception {
         for (String record : value.split("\\|")) {
            String[] split = record.split(",");
            if (split.length == 2) {
               out.collect(new Tuple2<>(Integer.valueOf(split[0]), split[1]));
            } else if (split.length == 3) {
               out.collect(new Tuple3<>(Integer.valueOf(split[0]),
split[1], Double.valueOf(split[2])));
            } else {
               out.collect(new Tuple4<>(Integer.valueOf(split[0]),
split[1], Double.valueOf(split[2]), Long.valueOf(split[3])));
            }
         }
      }
   });

   dsTuple.print();
   env.execute();
}


Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午11:47写道:

> Hello Weng,
>
> This definitely helps a lot,  however I know my initial DS has a single
> row content then I would in theory just create a DS which is what I need.
> That is why I need to know how to create a new environment DS within a map
> function.
>
> thanks so much
>
> On Tue, Jul 23, 2019 at 11:41 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi Andres,
>>
>> Thanks for the detailed explanation.
>>
>> but apparently I can't create a new DS within a map function
>>
>>
>> If you create a new DS within the map function, then you'll create as
>> many DSs as the number of elements in the old DS which... doesn't seem to
>> be your desired situation? I suppose you want to create a DS<Tuple> from
>> DS<String>. If that is the case you can write something like this:
>>
>> public static void main(String[] args) throws Exception {
>>    StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>    DataStream<String> dsString = env.fromElements("1,a,1.1", "2,b,2.2,-2", 
>> "3,c", "4,d,4.4");
>>    DataStream<Object> dsTuple = dsString.map(s -> {
>>       String[] split = s.split(",");
>>       if (split.length == 2) {
>>          return new Tuple2<>(Integer.valueOf(split[0]), split[1]);
>>       } else if (split.length == 3) {
>>          return new Tuple3<>(Integer.valueOf(split[0]), split[1], 
>> Double.valueOf(split[2]));
>>       } else {
>>          return new Tuple4<>(Integer.valueOf(split[0]), split[1], 
>> Double.valueOf(split[2]), Long.valueOf(split[3]));
>>       }
>>    });
>>
>>    dsTuple.print();
>>    env.execute();
>> }
>>
>>
>> How dynamically create the DS<Tuple>
>>
>>
>> As you can see in the above code, I did not create a DS<Tuple> but a
>> DS<Object>, because Tuple can't be directly used. It seems that you want to
>> turn this new DS into a table, but if different records have different
>> number of columns this is not a good practice as the schema of each record
>> is not the same (but as a workaround, you can fill the columns with null if
>> some record doesn't have this column).
>>
>> Hope this solves your problem. If you have any other problems feel free
>> to write back.
>>
>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:50写道:
>>
>>> Hello,
>>>
>>> Let me list properly the questions I have:
>>>
>>> * How to catch into a string the content of a DataStream? about this
>>> point basically I have a DS<String> , the only way how I can use the
>>> content is within a map function , print , store the content somewhere or
>>> SQL queries. The point is that I need the content because depending on that
>>> content I need to create another DS and later register it as a Table
>>> environment, which means I need the value content but likewise the headers
>>> content and the whole info is within the DS<String>. The first option I had
>>> was play with the map function but apparently I can't create a new DS
>>> within a map function and less register it as a new table environment.
>>>
>>> My second option in this point could be create a sort of public variable
>>> to store the DS<String> content and then create my UDF, but sadly this is
>>> neither allowed. My options in this case would be either somehow store
>>> public the content of the DS<String> into a new variable, turn the
>>> DS<String> as String or store the content in a file and read the file and
>>> start over to parse the content to serve the header and content for the new
>>> DS.
>>>
>>> * How dynamically create the DS<Tuple>: well basically after parse the
>>> point above I might end up with an array of fields sometimes 4,3,2 doesnt
>>> matter then I might need to swing between different tuples or turn my
>>> content into Row to create a DS<Row>.
>>>
>>> I'm looking forward to reading your comments.
>>>
>>> thanks so much
>>>
>>> On Tue, Jul 23, 2019 at 10:34 PM Caizhi Weng <tsreape...@gmail.com>
>>> wrote:
>>>
>>>> Hi Andres,
>>>>
>>>> Sorry I can't quite get your question... Do you mean that how to spilt
>>>> the string into fields?
>>>>
>>>> There is a `split` method in java. You can give it a regexp and it will
>>>> return an array containing all the split fields.
>>>>
>>>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三 上午10:28写道:
>>>>
>>>>> Hello Weng,
>>>>>
>>>>> thanks for your reply, however I'm struggling to somehow read the
>>>>> content of my DS with the payload that defines how many fields the message
>>>>> contains into a String. That is the reason why I thought into a map
>>>>> function for that DS.
>>>>>
>>>>> The Tuple part can change overtime can even pass from 3 or 4 to 2 then
>>>>> it can change the whole time. How could I approach this challenge?
>>>>>
>>>>> thanks so much
>>>>>
>>>>> On Tue, Jul 23, 2019 at 10:23 PM Caizhi Weng <tsreape...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andres,
>>>>>>
>>>>>> Are the payloads strings? If yes, one method is that you can store
>>>>>> them as strings and process it further with user defined functions when 
>>>>>> you
>>>>>> need to use them.
>>>>>>
>>>>>> Another method is that you can store them into arrays.
>>>>>>
>>>>>> Also, if the type of the first 3 fields are the same for the first
>>>>>> and second payload, you can use a Tuple4<> and set the last element as 
>>>>>> null
>>>>>> for the first payload.
>>>>>>
>>>>>> Andres Angel <ingenieroandresan...@gmail.com> 于2019年7月24日周三
>>>>>> 上午10:09写道:
>>>>>>
>>>>>>> Hello everyone,
>>>>>>>
>>>>>>> I need to create dynamically the size of my Tuple that feeds a DS,
>>>>>>> let me explain it better. Let's assume the first payload I read has this
>>>>>>> format "filed1,field2,field3", then this might require a Tuple3<> but my
>>>>>>> payload later can be "field1,field2,field3,field4" then my Tuple might 
>>>>>>> need
>>>>>>> to be refine it on the flight and now be Tuple4<>.
>>>>>>>
>>>>>>> How could I create this dynamically, any idea?
>>>>>>>
>>>>>>> Thanks so much
>>>>>>>
>>>>>>

Reply via email to