Hi Fabian,

How the process function will be called at 12:00:01 as there are no windows 
output or events after 12:00:00. 

Thanks 

> On 22 Jun 2017, at 17:07, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Let's say window A and window B end at 12:00:00 and window C at 13:00:00.
> When the ProcessFunction receives a watermark at 12:00:01, it knows that 
> Window A and B have been finished. 
> When it receives a watermark of 13:00:01 it knows that all results of window 
> C have been received. If there were no records with timestamp 13:00:00, 
> window C did not receive any data and didn't there not compute anything.
> 
> 
> 2017-06-22 17:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>:
>> Thanks for the answers. My scenario is:
>> 
>> | Window A |
>> | Window B |
>>                    | Window C |
>> 
>> If no events are received for Window C, then how process function would know 
>> that both window 'A' and window 'B' have finished and need to aggregated 
>> their result before sink is called?
>> 
>> Thanks
>> 
>> 
>>> On 22 June 2017 at 16:27, Fabian Hueske <fhue...@gmail.com> wrote:
>>> Hi Ahmad,
>>> 
>>> Flink's watermark mechanism guarantees that when you receive a watermark 
>>> for time t all records with a timestamp smaller than t have been received 
>>> as well.
>>> Records emitted from a window have the timestamp of their end time. So, the 
>>> ProcessFunction receives a timestamp for 12:00:00 you can be sure that you 
>>> also received all records for windows that closed before 12:00:00.
>>> The function should buffer all records it receives between watermarks as 
>>> state and once it receives a watermark (triggering of a registered 
>>> event-time timer) it should write the buffered records out.
>>> 
>>> Btw. this only works for event time windows but not for processing time.
>>> 
>>> Cheers, Fabian
>>> 
>>> 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>:
>>>> Hi Stefan,
>>>> 
>>>> How process function would know that the last window result has arrived? 
>>>> Because slidingwindows slide every 5 minutes which means that window of 
>>>> new time-range or new watermark will arrive after 5 minutes. 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>>> On 22 June 2017 at 15:10, Stefan Richter <s.rich...@data-artisans.com> 
>>>>> wrote:
>>>>> The process function has the signature
>>>>> 
>>>>> void processElement(I value, Context ctx, Collector<O> out) throws 
>>>>> Exception
>>>>> where the context is providing access to the current watermark and you 
>>>>> can also register timer callbacks, when that trigger when a certain 
>>>>> watermark is reached. You can simply monitor the watermark through the 
>>>>> context for each incoming window result. Start time is not important, 
>>>>> because you know that you have collected the results for all windows with 
>>>>> a smaller end time than the watermark that you currently see in the 
>>>>> context, because this is Flinkā€™s notion of completeness. This means you 
>>>>> can prepare those windows and aggregate results and send them downstream 
>>>>> to the sink.
>>>>> 
>>>>>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>:
>>>>>> 
>>>>>> Thanks Stefan, But how the Process function will have these watermarks? 
>>>>>> I have sliding windows like below
>>>>>> 
>>>>>> final DataStream<WindowStats> eventStream = inputStream
>>>>>>                          .keyBy(TENANT, CATEGORY)
>>>>>>                          
>>>>>> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5)))
>>>>>>                          .fold(new WindowStats(), new 
>>>>>> ProductAggregationMapper(), new ProductAggregationWindowFunction());
>>>>>> 
>>>>>> Window results are coming every 5 minutes after first window output. How 
>>>>>> the process function would know that all the windows for a Tenant have 
>>>>>> finished for a giving start and end time.
>>>>>> 
>>>>>> Thanks for help.
>>>>>> 
>>>>>> Cheers,
>>>>>> 
>>>>>>> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com> 
>>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> one possible approach could be that you have a process function before 
>>>>>>> the sink. Process function is aware of watermarks, so it can collect 
>>>>>>> and buffer window results until it sees a watermark. This is the signal 
>>>>>>> that all results for windows with an end time smaller than the 
>>>>>>> watermark are complete. They can then be aggregated and send to the 
>>>>>>> sink.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>> 
>>>>>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com>:
>>>>>>> >
>>>>>>> > Hi All,
>>>>>>> >
>>>>>>> > I am using categoryID as a keyby attribute for creating keyed stream 
>>>>>>> > from a product event stream. Keyed stream then creates time windows 
>>>>>>> > for each category. However, when the window time expires, i want to 
>>>>>>> > write the output data of all the products in all all categories in a 
>>>>>>> > single atomic operation collectively. Is there a way to call a single 
>>>>>>> > sink function for all the windows with same start and end time. Or is 
>>>>>>> > there a way in flink to know that all windows with same end time have 
>>>>>>> > finished processing their sink function?
>>>>>>> >
>>>>>>> > Currently, each window calls sink function individually.
>>>>>>> >
>>>>>>> > cheers,
>>>>>>> >
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to