Thanks,Hequn. If I have to do a TumblingWindow operation like: .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
I am not able to do that on the output of keyBy(..) which is a KeyedStream. I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream and then perform group operation on the resultant set to get total count etc. I am only able to do only 1 of keyBy or timeWindowAll as follows: .keyBy(*d._1,d._2*) .process(new KeyProcessing(FIVE_SECONDS, "componentOperation")) OR .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS)) .process(new WindowProcessing(FIVE_SECONDS)) Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in the next step: .keyBy(*d._1,d._2*) .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS)) .process(new WindowProcessing(FIVE_SECONDS)) TIA, Vijay On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Vijay, > > Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in > `inputStream`. > While option 2 replicate all data to each task and option 3 split data > into smaller groups without duplication. > > Best, Hequn > > On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi, >> I need to broadcast/parallelize an incoming stream(inputStream) into 5 >> streams with the same data. Each stream is keyed by different keys to do >> various grouping operations on the set. >> >> Do I just use inputStream.keyBy(5 diff keys) and then just use the >> DataStream to perform windowing/grouping operations ? >> >> *DataStream<Long> inputStream= ...* >> *DataStream<Long> keyBy1 = inputStream.keyBy((d) -> d._1);* >> *DataStream<Long> keyBy2 = inputStream.keyBy((d) -> d._2);* >> >> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do >> windowing/grouping operations in this function* >> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do >> windowing/grouping operations in this function* >> >> out1Stream.print(); >> out2Stream.addSink(new Out2Sink()); >> >> Will this work ? >> >> Or do I use the keyBy Stream with a broadcast function like this: >> >> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);* >> *DataSTream out1Stream = keyBy1.connect(broadCastStream)* >> * .process(new KeyedBroadcastProcessFunction...)* >> >> *DataSTream out2Stream = keyBy2.connect(broadCastStream)* >> * .process(new KeyedBroadcastProcessFunction...)* >> >> Or do I need to use split: >> >> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());* >> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);* >> source.select("").flatMap(new Key2Function()).addSink(out2Sink); >> >> >> static final class MyOutputSelector implements OutputSelector<Long> { >> List<String> outputs = new ArrayList<String>(); >> public Iterable<String> select(Long value) { >> outputs.add(""); >> return outputs; >> } >> } >> TIA, >> >