Cool, thanks! Hequn. I will try that approach. Vijay
On Thu, Nov 1, 2018 at 8:18 PM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Vijay, > > > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow > operation on the KeyedStream and then perform group operation on the > resultant set to get total count etc. > > From your description, I think you can perform a TumblingEventTimeWindow > first, something looks like: > >> // tumbling processing-time windows >> input >> .keyBy(<key selector>) >> .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) >> .<windowed transformation>(<window function>); > > then, you can perform a windowAll after the TumblingEventTimeWindow to get > the final total count. > > Best, > Hequn > > > > On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Thanks,Hequn. >> If I have to do a TumblingWindow operation like: >> >> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, >> TimeUnit.SECONDS)) >> >> I am not able to do that on the output of keyBy(..) which is a KeyedStream. >> >> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow >> operation on the KeyedStream >> >> and then perform group operation on the resultant set to get total count etc. >> >> I am only able to do only 1 of keyBy or timeWindowAll as follows: >> >> >> .keyBy(*d._1,d._2*) >> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation")) >> >> OR >> >> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, >> TimeUnit.SECONDS)) >> .process(new WindowProcessing(FIVE_SECONDS)) >> >> >> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost >> in the next step: >> >> .keyBy(*d._1,d._2*) >> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, >> TimeUnit.SECONDS)) >> .process(new WindowProcessing(FIVE_SECONDS)) >> >> >> TIA, >> >> Vijay >> >> >> >> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Vijay, >>> >>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in >>> `inputStream`. >>> While option 2 replicate all data to each task and option 3 split data >>> into smaller groups without duplication. >>> >>> Best, Hequn >>> >>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bvija...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5 >>>> streams with the same data. Each stream is keyed by different keys to do >>>> various grouping operations on the set. >>>> >>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the >>>> DataStream to perform windowing/grouping operations ? >>>> >>>> *DataStream<Long> inputStream= ...* >>>> *DataStream<Long> keyBy1 = inputStream.keyBy((d) -> d._1);* >>>> *DataStream<Long> keyBy2 = inputStream.keyBy((d) -> d._2);* >>>> >>>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do >>>> windowing/grouping operations in this function* >>>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do >>>> windowing/grouping operations in this function* >>>> >>>> out1Stream.print(); >>>> out2Stream.addSink(new Out2Sink()); >>>> >>>> Will this work ? >>>> >>>> Or do I use the keyBy Stream with a broadcast function like this: >>>> >>>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);* >>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)* >>>> * .process(new KeyedBroadcastProcessFunction...)* >>>> >>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)* >>>> * .process(new KeyedBroadcastProcessFunction...)* >>>> >>>> Or do I need to use split: >>>> >>>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());* >>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);* >>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink); >>>> >>>> >>>> static final class MyOutputSelector implements OutputSelector<Long> { >>>> List<String> outputs = new ArrayList<String>(); >>>> public Iterable<String> select(Long value) { >>>> outputs.add(""); >>>> return outputs; >>>> } >>>> } >>>> TIA, >>>> >>>