Cool, thanks! Hequn. I will try that approach.

Vijay

On Thu, Nov 1, 2018 at 8:18 PM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Vijay,
>
> > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
> operation on the KeyedStream and then perform group operation on the
> resultant set to get total count etc.
>
> From your description, I think you can perform a TumblingEventTimeWindow
> first, something looks like:
>
>> // tumbling processing-time windows
>> input
>>     .keyBy(<key selector>)
>>     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>>     .<windowed transformation>(<window function>);
>
> then, you can perform a windowAll after the TumblingEventTimeWindow to get
> the final total count.
>
> Best,
> Hequn
>
>
>
> On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Thanks,Hequn.
>> If I have to do a TumblingWindow operation like:
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>>
>> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>>
>> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow 
>> operation on the KeyedStream
>>
>> and then perform group operation on the resultant set to get total count etc.
>>
>> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>>
>>
>> .keyBy(*d._1,d._2*)
>> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>>
>> OR
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost 
>> in the next step:
>>
>> .keyBy(*d._1,d._2*)
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
>> TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> TIA,
>>
>> Vijay
>>
>>
>>
>> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>>> `inputStream`.
>>> While option 2 replicate all data to each task and option 3 split data
>>> into smaller groups without duplication.
>>>
>>> Best, Hequn
>>>
>>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bvija...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>>>> streams with the same data. Each stream is keyed by different keys to do
>>>> various grouping operations on the set.
>>>>
>>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>>>> DataStream to perform windowing/grouping operations ?
>>>>
>>>> *DataStream<Long> inputStream= ...*
>>>> *DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
>>>> *DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>>>
>>>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
>>>> windowing/grouping operations in this function*
>>>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
>>>> windowing/grouping operations in this function*
>>>>
>>>> out1Stream.print();
>>>> out2Stream.addSink(new Out2Sink());
>>>>
>>>> Will this work ?
>>>>
>>>> Or do I use the keyBy Stream with a broadcast function like this:
>>>>
>>>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
>>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>>
>>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>>
>>>> Or do I need to use split:
>>>>
>>>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
>>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>>>
>>>>
>>>> static final class MyOutputSelector implements OutputSelector<Long> {
>>>> List<String> outputs = new ArrayList<String>();
>>>> public Iterable<String> select(Long value) {
>>>> outputs.add("");
>>>> return outputs;
>>>> }
>>>> }
>>>> TIA,
>>>>
>>>

Reply via email to