Hi Vijay, > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream and then perform group operation on the resultant set to get total count etc.
>From your description, I think you can perform a TumblingEventTimeWindow first, something looks like: > // tumbling processing-time windows > input > .keyBy(<key selector>) > .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) > .<windowed transformation>(<window function>); then, you can perform a windowAll after the TumblingEventTimeWindow to get the final total count. Best, Hequn On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Thanks,Hequn. > If I have to do a TumblingWindow operation like: > > .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, > TimeUnit.SECONDS)) > > I am not able to do that on the output of keyBy(..) which is a KeyedStream. > > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow > operation on the KeyedStream > > and then perform group operation on the resultant set to get total count etc. > > I am only able to do only 1 of keyBy or timeWindowAll as follows: > > > .keyBy(*d._1,d._2*) > .process(new KeyProcessing(FIVE_SECONDS, "componentOperation")) > > OR > > .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, > TimeUnit.SECONDS)) > .process(new WindowProcessing(FIVE_SECONDS)) > > > Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in > the next step: > > .keyBy(*d._1,d._2*) > .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, > TimeUnit.SECONDS)) > .process(new WindowProcessing(FIVE_SECONDS)) > > > TIA, > > Vijay > > > > On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Vijay, >> >> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in >> `inputStream`. >> While option 2 replicate all data to each task and option 3 split data >> into smaller groups without duplication. >> >> Best, Hequn >> >> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi, >>> I need to broadcast/parallelize an incoming stream(inputStream) into 5 >>> streams with the same data. Each stream is keyed by different keys to do >>> various grouping operations on the set. >>> >>> Do I just use inputStream.keyBy(5 diff keys) and then just use the >>> DataStream to perform windowing/grouping operations ? >>> >>> *DataStream<Long> inputStream= ...* >>> *DataStream<Long> keyBy1 = inputStream.keyBy((d) -> d._1);* >>> *DataStream<Long> keyBy2 = inputStream.keyBy((d) -> d._2);* >>> >>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do >>> windowing/grouping operations in this function* >>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do >>> windowing/grouping operations in this function* >>> >>> out1Stream.print(); >>> out2Stream.addSink(new Out2Sink()); >>> >>> Will this work ? >>> >>> Or do I use the keyBy Stream with a broadcast function like this: >>> >>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);* >>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)* >>> * .process(new KeyedBroadcastProcessFunction...)* >>> >>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)* >>> * .process(new KeyedBroadcastProcessFunction...)* >>> >>> Or do I need to use split: >>> >>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());* >>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);* >>> source.select("").flatMap(new Key2Function()).addSink(out2Sink); >>> >>> >>> static final class MyOutputSelector implements OutputSelector<Long> { >>> List<String> outputs = new ArrayList<String>(); >>> public Iterable<String> select(Long value) { >>> outputs.add(""); >>> return outputs; >>> } >>> } >>> TIA, >>> >>