You have to register an event-time timer in the `processElement()` method.
You'll get a callback to `onTimer()` when the function receives a watermark
that is greater than the registered timer. So you can always register a
timer for the end time of the next window to get a call back to `onTimer()`
when all results for a window have been received. The documentation of the
ProcessFunction explains details [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html

2017-06-22 18:49 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>:

> Hi Fabian,
>
> How the process function will be called at 12:00:01 as there are no
> windows output or events after 12:00:00.
>
> Thanks
>
> On 22 Jun 2017, at 17:07, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Let's say window A and window B end at 12:00:00 and window C at 13:00:00.
> When the ProcessFunction receives a watermark at 12:00:01, it knows that
> Window A and B have been finished.
> When it receives a watermark of 13:00:01 it knows that all results of
> window C have been received. If there were no records with timestamp
> 13:00:00, window C did not receive any data and didn't there not compute
> anything.
>
>
> 2017-06-22 17:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>:
>
>> Thanks for the answers. My scenario is:
>>
>> | Window A |
>> | Window B |
>>                    | Window C |
>>
>> If no events are received for Window C, then how process function would
>> know that both window 'A' and window 'B' have finished and need to
>> aggregated their result before sink is called?
>>
>> Thanks
>>
>>
>> On 22 June 2017 at 16:27, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Ahmad,
>>>
>>> Flink's watermark mechanism guarantees that when you receive a watermark
>>> for time t all records with a timestamp smaller than t have been received
>>> as well.
>>> Records emitted from a window have the timestamp of their end time. So,
>>> the ProcessFunction receives a timestamp for 12:00:00 you can be sure that
>>> you also received all records for windows that closed before 12:00:00.
>>> The function should buffer all records it receives between watermarks as
>>> state and once it receives a watermark (triggering of a registered
>>> event-time timer) it should write the buffered records out.
>>>
>>> Btw. this only works for event time windows but not for processing time.
>>>
>>> Cheers, Fabian
>>>
>>> 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>:
>>>
>>>> Hi Stefan,
>>>>
>>>> How process function would know that the last window result has
>>>> arrived? Because slidingwindows slide every 5 minutes which means that
>>>> window of new time-range or new watermark will arrive after 5 minutes.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On 22 June 2017 at 15:10, Stefan Richter <s.rich...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> The process function has the signature
>>>>>
>>>>> void processElement(I value, Context ctx, Collector<O> out) throws 
>>>>> Exception
>>>>>
>>>>> where the context is providing access to the current watermark and you
>>>>> can also register timer callbacks, when that trigger when a certain
>>>>> watermark is reached. You can simply monitor the watermark through the
>>>>> context for each incoming window result. *Start* time is not
>>>>> important, because you know that you have collected the results for all
>>>>> windows with a smaller *end* time than the watermark that you
>>>>> currently see in the context, because this is Flinkā€™s notion of
>>>>> completeness. This means you can prepare those windows and aggregate
>>>>> results and send them downstream to the sink.
>>>>>
>>>>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>:
>>>>>
>>>>> Thanks Stefan, But how the Process function will have these
>>>>> watermarks? I have sliding windows like below
>>>>>
>>>>> final DataStream<WindowStats> eventStream = inputStream
>>>>> .keyBy(TENANT, CATEGORY)
>>>>> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(
>>>>> 5)))
>>>>> .fold(new WindowStats(), new ProductAggregationMapper(), new
>>>>> ProductAggregationWindowFunction());
>>>>>
>>>>> Window results are coming every 5 minutes after first window output.
>>>>> How the process function would know that all the windows for a Tenant have
>>>>> finished for a giving start and end time.
>>>>>
>>>>> Thanks for help.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> one possible approach could be that you have a process function
>>>>>> before the sink. Process function is aware of watermarks, so it can 
>>>>>> collect
>>>>>> and buffer window results until it sees a watermark. This is the signal
>>>>>> that all results for windows with an end time smaller than the watermark
>>>>>> are complete. They can then be aggregated and send to the sink.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com
>>>>>> >:
>>>>>> >
>>>>>> > Hi All,
>>>>>> >
>>>>>> > I am using categoryID as a keyby attribute for creating keyed
>>>>>> stream from a product event stream. Keyed stream then creates time 
>>>>>> windows
>>>>>> for each category. However, when the window time expires, i want to write
>>>>>> the output data of all the products in all all categories in a single
>>>>>> atomic operation collectively. Is there a way to call a single sink
>>>>>> function for all the windows with same start and end time. Or is there a
>>>>>> way in flink to know that all windows with same end time have finished
>>>>>> processing their sink function?
>>>>>> >
>>>>>> > Currently, each window calls sink function individually.
>>>>>> >
>>>>>> > cheers,
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to