Hi, apply() will be called for each key. On Wed, Oct 12, 2016 at 2:25 PM, Swapnil Chougule <the.swapni...@gmail.com> wrote:
> Thanks Aljoscha. > > Whenever I am using WindowFunction.apply() on keyed stream, apply() will > be called once or multiple times (equal to number of keys in that windowed > stream)? > > Ex: > DataStream<Boolean> dataStream = env > .socketTextStream("localhost", 9999) > .flatMap(new Splitter()) > .keyBy(0) > .timeWindow(Time.seconds(10)) > .apply(new WindowFunction<Tuple2<String,Integer>, > Boolean, Tuple, TimeWindow>() { > > @Override > public void apply(Tuple key, TimeWindow window, > Iterable<Tuple2<String, Integer>> input, > Collector<Boolean> out) throws Exception { > //Some business logic > } > }); > > Regards, > Swapnil > > On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> WindowFunction.apply() will be called once for each window so you should >> be able to do the setup/teardown in there. open() and close() are called at >> the start of processing, end of processing, respectively. >> >> Cheers, >> Aljoscha >> >> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <the.swapni...@gmail.com> >> wrote: >> >>> Hi Team, >>> >>> I am using tumbling window functionality having window size 5 minutes. >>> I want to perform setup & teardown functionality for each window. I >>> tried using RichWindowFunction but it didn't work for me. >>> Can anybody tell me how can I do it ? >>> >>> Attaching code snippet what I tried >>> >>> impressions.map(new LineItemAdUnitAggr()).keyBy(0) >>> .timeWindow(Time.seconds(300)).apply(new >>> RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>, >>> Boolean, Tuple, TimeWindow>() { >>> >>> @Override >>> public void open(Configuration parameters) throws >>> Exception { >>> super.open(parameters); >>> //setup method >>> } >>> >>> public void apply(Tuple key, TimeWindow window, >>> Iterable<Tuple2<Tuple2<Integer, Integer>, >>> Long>> input, >>> Collector<Boolean> out) throws Exception { >>> //do processing >>> } >>> >>> @Override >>> public void close() throws Exception { >>> //tear down method >>> super.close(); >>> } >>> }); >>> >>> Thanks, >>> Swapnil >>> >> >