Thanks Aljoscha.

Whenever I am using WindowFunction.apply() on keyed stream, apply() will be
called once or multiple times (equal to number of keys in that windowed
stream)?

Ex:
DataStream<Boolean> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .apply(new WindowFunction<Tuple2<String,Integer>, Boolean,
Tuple, TimeWindow>() {

                    @Override
                    public void apply(Tuple key, TimeWindow window,
                            Iterable<Tuple2<String, Integer>> input,
                            Collector<Boolean> out) throws Exception {
                     //Some business logic
                    }
                });

Regards,
Swapnil

On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> WindowFunction.apply() will be called once for each window so you should
> be able to do the setup/teardown in there. open() and close() are called at
> the start of processing, end of processing, respectively.
>
> Cheers,
> Aljoscha
>
> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <the.swapni...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am using tumbling window functionality having window size 5 minutes.
>> I want to perform setup & teardown functionality for each window. I tried
>> using RichWindowFunction but it didn't work for me.
>> Can anybody tell me how can I do it ?
>>
>> Attaching code snippet what I tried
>>
>> impressions.map(new LineItemAdUnitAggr()).keyBy(0)
>> .timeWindow(Time.seconds(300)).apply(new 
>> RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>,
>> Boolean, Tuple, TimeWindow>() {
>>
>>                 @Override
>>                 public void open(Configuration parameters) throws
>> Exception {
>>                     super.open(parameters);
>>                     //setup method
>>                 }
>>
>>                 public void apply(Tuple key, TimeWindow window,
>>                         Iterable<Tuple2<Tuple2<Integer, Integer>, Long>>
>> input,
>>                         Collector<Boolean> out) throws Exception {
>>                     //do processing
>>                 }
>>
>>                 @Override
>>                 public void close() throws Exception {
>>                     //tear down method
>>                     super.close();
>>                 }
>>             });
>>
>> Thanks,
>> Swapnil
>>
>

Reply via email to