Hi Ahmad,

Flink's watermark mechanism guarantees that when you receive a watermark
for time t all records with a timestamp smaller than t have been received
as well.
Records emitted from a window have the timestamp of their end time. So, the
ProcessFunction receives a timestamp for 12:00:00 you can be sure that you
also received all records for windows that closed before 12:00:00.
The function should buffer all records it receives between watermarks as
state and once it receives a watermark (triggering of a registered
event-time timer) it should write the buffered records out.

Btw. this only works for event time windows but not for processing time.

Cheers, Fabian

2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>:

> Hi Stefan,
>
> How process function would know that the last window result has arrived?
> Because slidingwindows slide every 5 minutes which means that window of new
> time-range or new watermark will arrive after 5 minutes.
>
> Thanks
>
>
> On 22 June 2017 at 15:10, Stefan Richter <s.rich...@data-artisans.com>
> wrote:
>
>> The process function has the signature
>>
>> void processElement(I value, Context ctx, Collector<O> out) throws Exception
>>
>> where the context is providing access to the current watermark and you
>> can also register timer callbacks, when that trigger when a certain
>> watermark is reached. You can simply monitor the watermark through the
>> context for each incoming window result. *Start* time is not important,
>> because you know that you have collected the results for all windows with a
>> smaller *end* time than the watermark that you currently see in the
>> context, because this is Flink’s notion of completeness. This means you can
>> prepare those windows and aggregate results and send them downstream to the
>> sink.
>>
>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>:
>>
>> Thanks Stefan, But how the Process function will have these watermarks?
>> I have sliding windows like below
>>
>> final DataStream<WindowStats> eventStream = inputStream
>> .keyBy(TENANT, CATEGORY)
>> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5
>> )))
>> .fold(new WindowStats(), new ProductAggregationMapper(), new
>> ProductAggregationWindowFunction());
>>
>> Window results are coming every 5 minutes after first window output. How
>> the process function would know that all the windows for a Tenant have
>> finished for a giving start and end time.
>>
>> Thanks for help.
>>
>> Cheers,
>>
>> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> one possible approach could be that you have a process function before
>>> the sink. Process function is aware of watermarks, so it can collect and
>>> buffer window results until it sees a watermark. This is the signal that
>>> all results for windows with an end time smaller than the watermark are
>>> complete. They can then be aggregated and send to the sink.
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com>:
>>> >
>>> > Hi All,
>>> >
>>> > I am using categoryID as a keyby attribute for creating keyed stream
>>> from a product event stream. Keyed stream then creates time windows for
>>> each category. However, when the window time expires, i want to write the
>>> output data of all the products in all all categories in a single atomic
>>> operation collectively. Is there a way to call a single sink function for
>>> all the windows with same start and end time. Or is there a way in flink to
>>> know that all windows with same end time have finished processing their
>>> sink function?
>>> >
>>> > Currently, each window calls sink function individually.
>>> >
>>> > cheers,
>>> >
>>>
>>>
>>
>>
>

Reply via email to