Thanks for the answers. My scenario is:

| Window A |
| Window B |
                   | Window C |

If no events are received for Window C, then how process function would
know that both window 'A' and window 'B' have finished and need to
aggregated their result before sink is called?


On 22 June 2017 at 16:27, Fabian Hueske <> wrote:

> Hi Ahmad,
> Flink's watermark mechanism guarantees that when you receive a watermark
> for time t all records with a timestamp smaller than t have been received
> as well.
> Records emitted from a window have the timestamp of their end time. So,
> the ProcessFunction receives a timestamp for 12:00:00 you can be sure that
> you also received all records for windows that closed before 12:00:00.
> The function should buffer all records it receives between watermarks as
> state and once it receives a watermark (triggering of a registered
> event-time timer) it should write the buffered records out.
> Btw. this only works for event time windows but not for processing time.
> Cheers, Fabian
> 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <>:
>> Hi Stefan,
>> How process function would know that the last window result has arrived?
>> Because slidingwindows slide every 5 minutes which means that window of new
>> time-range or new watermark will arrive after 5 minutes.
>> Thanks
>> On 22 June 2017 at 15:10, Stefan Richter <>
>> wrote:
>>> The process function has the signature
>>> void processElement(I value, Context ctx, Collector<O> out) throws Exception
>>> where the context is providing access to the current watermark and you
>>> can also register timer callbacks, when that trigger when a certain
>>> watermark is reached. You can simply monitor the watermark through the
>>> context for each incoming window result. *Start* time is not important,
>>> because you know that you have collected the results for all windows with a
>>> smaller *end* time than the watermark that you currently see in the
>>> context, because this is Flinkā€™s notion of completeness. This means you can
>>> prepare those windows and aggregate results and send them downstream to the
>>> sink.
>>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <>:
>>> Thanks Stefan, But how the Process function will have these watermarks?
>>> I have sliding windows like below
>>> final DataStream<WindowStats> eventStream = inputStream
>>> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5
>>> )))
>>> .fold(new WindowStats(), new ProductAggregationMapper(), new
>>> ProductAggregationWindowFunction());
>>> Window results are coming every 5 minutes after first window output. How
>>> the process function would know that all the windows for a Tenant have
>>> finished for a giving start and end time.
>>> Thanks for help.
>>> Cheers,
>>> On 22 June 2017 at 14:37, Stefan Richter <>
>>> wrote:
>>>> Hi,
>>>> one possible approach could be that you have a process function before
>>>> the sink. Process function is aware of watermarks, so it can collect and
>>>> buffer window results until it sees a watermark. This is the signal that
>>>> all results for windows with an end time smaller than the watermark are
>>>> complete. They can then be aggregated and send to the sink.
>>>> Best,
>>>> Stefan
>>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <>:
>>>> >
>>>> > Hi All,
>>>> >
>>>> > I am using categoryID as a keyby attribute for creating keyed stream
>>>> from a product event stream. Keyed stream then creates time windows for
>>>> each category. However, when the window time expires, i want to write the
>>>> output data of all the products in all all categories in a single atomic
>>>> operation collectively. Is there a way to call a single sink function for
>>>> all the windows with same start and end time. Or is there a way in flink to
>>>> know that all windows with same end time have finished processing their
>>>> sink function?
>>>> >
>>>> > Currently, each window calls sink function individually.
>>>> >
>>>> > cheers,
>>>> >

Reply via email to