Let's say window A and window B end at 12:00:00 and window C at 13:00:00. When the ProcessFunction receives a watermark at 12:00:01, it knows that Window A and B have been finished. When it receives a watermark of 13:00:01 it knows that all results of window C have been received. If there were no records with timestamp 13:00:00, window C did not receive any data and didn't there not compute anything.
2017-06-22 17:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>: > Thanks for the answers. My scenario is: > > | Window A | > | Window B | > | Window C | > > If no events are received for Window C, then how process function would > know that both window 'A' and window 'B' have finished and need to > aggregated their result before sink is called? > > Thanks > > > On 22 June 2017 at 16:27, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Ahmad, >> >> Flink's watermark mechanism guarantees that when you receive a watermark >> for time t all records with a timestamp smaller than t have been received >> as well. >> Records emitted from a window have the timestamp of their end time. So, >> the ProcessFunction receives a timestamp for 12:00:00 you can be sure that >> you also received all records for windows that closed before 12:00:00. >> The function should buffer all records it receives between watermarks as >> state and once it receives a watermark (triggering of a registered >> event-time timer) it should write the buffered records out. >> >> Btw. this only works for event time windows but not for processing time. >> >> Cheers, Fabian >> >> 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>: >> >>> Hi Stefan, >>> >>> How process function would know that the last window result has arrived? >>> Because slidingwindows slide every 5 minutes which means that window of new >>> time-range or new watermark will arrive after 5 minutes. >>> >>> Thanks >>> >>> >>> On 22 June 2017 at 15:10, Stefan Richter <s.rich...@data-artisans.com> >>> wrote: >>> >>>> The process function has the signature >>>> >>>> void processElement(I value, Context ctx, Collector<O> out) throws >>>> Exception >>>> >>>> where the context is providing access to the current watermark and you >>>> can also register timer callbacks, when that trigger when a certain >>>> watermark is reached. You can simply monitor the watermark through the >>>> context for each incoming window result. *Start* time is not >>>> important, because you know that you have collected the results for all >>>> windows with a smaller *end* time than the watermark that you >>>> currently see in the context, because this is Flinkās notion of >>>> completeness. This means you can prepare those windows and aggregate >>>> results and send them downstream to the sink. >>>> >>>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>: >>>> >>>> Thanks Stefan, But how the Process function will have these >>>> watermarks? I have sliding windows like below >>>> >>>> final DataStream<WindowStats> eventStream = inputStream >>>> .keyBy(TENANT, CATEGORY) >>>> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5 >>>> ))) >>>> .fold(new WindowStats(), new ProductAggregationMapper(), new >>>> ProductAggregationWindowFunction()); >>>> >>>> Window results are coming every 5 minutes after first window output. >>>> How the process function would know that all the windows for a Tenant have >>>> finished for a giving start and end time. >>>> >>>> Thanks for help. >>>> >>>> Cheers, >>>> >>>> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> one possible approach could be that you have a process function before >>>>> the sink. Process function is aware of watermarks, so it can collect and >>>>> buffer window results until it sees a watermark. This is the signal that >>>>> all results for windows with an end time smaller than the watermark are >>>>> complete. They can then be aggregated and send to the sink. >>>>> >>>>> Best, >>>>> Stefan >>>>> >>>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com >>>>> >: >>>>> > >>>>> > Hi All, >>>>> > >>>>> > I am using categoryID as a keyby attribute for creating keyed stream >>>>> from a product event stream. Keyed stream then creates time windows for >>>>> each category. However, when the window time expires, i want to write the >>>>> output data of all the products in all all categories in a single atomic >>>>> operation collectively. Is there a way to call a single sink function for >>>>> all the windows with same start and end time. Or is there a way in flink >>>>> to >>>>> know that all windows with same end time have finished processing their >>>>> sink function? >>>>> > >>>>> > Currently, each window calls sink function individually. >>>>> > >>>>> > cheers, >>>>> > >>>>> >>>>> >>>> >>>> >>> >> >