Hi Vijay,

> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
operation on the KeyedStream and then perform group operation on the
resultant set to get total count etc.

>From your description, I think you can perform a TumblingEventTimeWindow
first, something looks like:

> // tumbling processing-time windows
> input
>     .keyBy(<key selector>)
>     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>     .<windowed transformation>(<window function>);

then, you can perform a windowAll after the TumblingEventTimeWindow to get
the final total count.

Best,
Hequn



On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Thanks,Hequn.
> If I have to do a TumblingWindow operation like:
>
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
> TimeUnit.SECONDS))
>
> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>
> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow 
> operation on the KeyedStream
>
> and then perform group operation on the resultant set to get total count etc.
>
> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>
>
> .keyBy(*d._1,d._2*)
> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>
> OR
>
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
> TimeUnit.SECONDS))
> .process(new WindowProcessing(FIVE_SECONDS))
>
>
> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in 
> the next step:
>
> .keyBy(*d._1,d._2*)
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, 
> TimeUnit.SECONDS))
> .process(new WindowProcessing(FIVE_SECONDS))
>
>
> TIA,
>
> Vijay
>
>
>
> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>> `inputStream`.
>> While option 2 replicate all data to each task and option 3 split data
>> into smaller groups without duplication.
>>
>> Best, Hequn
>>
>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>>> streams with the same data. Each stream is keyed by different keys to do
>>> various grouping operations on the set.
>>>
>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>>> DataStream to perform windowing/grouping operations ?
>>>
>>> *DataStream<Long> inputStream= ...*
>>> *DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
>>> *DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>>
>>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
>>> windowing/grouping operations in this function*
>>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
>>> windowing/grouping operations in this function*
>>>
>>> out1Stream.print();
>>> out2Stream.addSink(new Out2Sink());
>>>
>>> Will this work ?
>>>
>>> Or do I use the keyBy Stream with a broadcast function like this:
>>>
>>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>
>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>
>>> Or do I need to use split:
>>>
>>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>>
>>>
>>> static final class MyOutputSelector implements OutputSelector<Long> {
>>> List<String> outputs = new ArrayList<String>();
>>> public Iterable<String> select(Long value) {
>>> outputs.add("");
>>> return outputs;
>>> }
>>> }
>>> TIA,
>>>
>>

Reply via email to