Hi,
apply() will be called for each key.

On Wed, Oct 12, 2016 at 2:25 PM, Swapnil Chougule <the.swapni...@gmail.com>
wrote:

> Thanks Aljoscha.
>
> Whenever I am using WindowFunction.apply() on keyed stream, apply() will
> be called once or multiple times (equal to number of keys in that windowed
> stream)?
>
> Ex:
> DataStream<Boolean> dataStream = env
>                 .socketTextStream("localhost", 9999)
>                 .flatMap(new Splitter())
>                 .keyBy(0)
>                 .timeWindow(Time.seconds(10))
>                 .apply(new WindowFunction<Tuple2<String,Integer>,
> Boolean, Tuple, TimeWindow>() {
>
>                     @Override
>                     public void apply(Tuple key, TimeWindow window,
>                             Iterable<Tuple2<String, Integer>> input,
>                             Collector<Boolean> out) throws Exception {
>                      //Some business logic
>                     }
>                 });
>
> Regards,
> Swapnil
>
> On Wed, Sep 14, 2016 at 2:26 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> WindowFunction.apply() will be called once for each window so you should
>> be able to do the setup/teardown in there. open() and close() are called at
>> the start of processing, end of processing, respectively.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 14 Sep 2016 at 09:04 Swapnil Chougule <the.swapni...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am using tumbling window functionality having window size 5 minutes.
>>> I want to perform setup & teardown functionality for each window. I
>>> tried using RichWindowFunction but it didn't work for me.
>>> Can anybody tell me how can I do it ?
>>>
>>> Attaching code snippet what I tried
>>>
>>> impressions.map(new LineItemAdUnitAggr()).keyBy(0)
>>> .timeWindow(Time.seconds(300)).apply(new 
>>> RichWindowFunction<Tuple2<Tuple2<Integer,Integer>,Long>,
>>> Boolean, Tuple, TimeWindow>() {
>>>
>>>                 @Override
>>>                 public void open(Configuration parameters) throws
>>> Exception {
>>>                     super.open(parameters);
>>>                     //setup method
>>>                 }
>>>
>>>                 public void apply(Tuple key, TimeWindow window,
>>>                         Iterable<Tuple2<Tuple2<Integer, Integer>,
>>> Long>> input,
>>>                         Collector<Boolean> out) throws Exception {
>>>                     //do processing
>>>                 }
>>>
>>>                 @Override
>>>                 public void close() throws Exception {
>>>                     //tear down method
>>>                     super.close();
>>>                 }
>>>             });
>>>
>>> Thanks,
>>> Swapnil
>>>
>>
>

Reply via email to