You have to register an event-time timer in the `processElement()` method. You'll get a callback to `onTimer()` when the function receives a watermark that is greater than the registered timer. So you can always register a timer for the end time of the next window to get a call back to `onTimer()` when all results for a window have been received. The documentation of the ProcessFunction explains details [1].
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html 2017-06-22 18:49 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>: > Hi Fabian, > > How the process function will be called at 12:00:01 as there are no > windows output or events after 12:00:00. > > Thanks > > On 22 Jun 2017, at 17:07, Fabian Hueske <fhue...@gmail.com> wrote: > > Let's say window A and window B end at 12:00:00 and window C at 13:00:00. > When the ProcessFunction receives a watermark at 12:00:01, it knows that > Window A and B have been finished. > When it receives a watermark of 13:00:01 it knows that all results of > window C have been received. If there were no records with timestamp > 13:00:00, window C did not receive any data and didn't there not compute > anything. > > > 2017-06-22 17:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>: > >> Thanks for the answers. My scenario is: >> >> | Window A | >> | Window B | >> | Window C | >> >> If no events are received for Window C, then how process function would >> know that both window 'A' and window 'B' have finished and need to >> aggregated their result before sink is called? >> >> Thanks >> >> >> On 22 June 2017 at 16:27, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Ahmad, >>> >>> Flink's watermark mechanism guarantees that when you receive a watermark >>> for time t all records with a timestamp smaller than t have been received >>> as well. >>> Records emitted from a window have the timestamp of their end time. So, >>> the ProcessFunction receives a timestamp for 12:00:00 you can be sure that >>> you also received all records for windows that closed before 12:00:00. >>> The function should buffer all records it receives between watermarks as >>> state and once it receives a watermark (triggering of a registered >>> event-time timer) it should write the buffered records out. >>> >>> Btw. this only works for event time windows but not for processing time. >>> >>> Cheers, Fabian >>> >>> 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>: >>> >>>> Hi Stefan, >>>> >>>> How process function would know that the last window result has >>>> arrived? Because slidingwindows slide every 5 minutes which means that >>>> window of new time-range or new watermark will arrive after 5 minutes. >>>> >>>> Thanks >>>> >>>> >>>> On 22 June 2017 at 15:10, Stefan Richter <s.rich...@data-artisans.com> >>>> wrote: >>>> >>>>> The process function has the signature >>>>> >>>>> void processElement(I value, Context ctx, Collector<O> out) throws >>>>> Exception >>>>> >>>>> where the context is providing access to the current watermark and you >>>>> can also register timer callbacks, when that trigger when a certain >>>>> watermark is reached. You can simply monitor the watermark through the >>>>> context for each incoming window result. *Start* time is not >>>>> important, because you know that you have collected the results for all >>>>> windows with a smaller *end* time than the watermark that you >>>>> currently see in the context, because this is Flinkās notion of >>>>> completeness. This means you can prepare those windows and aggregate >>>>> results and send them downstream to the sink. >>>>> >>>>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>: >>>>> >>>>> Thanks Stefan, But how the Process function will have these >>>>> watermarks? I have sliding windows like below >>>>> >>>>> final DataStream<WindowStats> eventStream = inputStream >>>>> .keyBy(TENANT, CATEGORY) >>>>> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes( >>>>> 5))) >>>>> .fold(new WindowStats(), new ProductAggregationMapper(), new >>>>> ProductAggregationWindowFunction()); >>>>> >>>>> Window results are coming every 5 minutes after first window output. >>>>> How the process function would know that all the windows for a Tenant have >>>>> finished for a giving start and end time. >>>>> >>>>> Thanks for help. >>>>> >>>>> Cheers, >>>>> >>>>> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> one possible approach could be that you have a process function >>>>>> before the sink. Process function is aware of watermarks, so it can >>>>>> collect >>>>>> and buffer window results until it sees a watermark. This is the signal >>>>>> that all results for windows with an end time smaller than the watermark >>>>>> are complete. They can then be aggregated and send to the sink. >>>>>> >>>>>> Best, >>>>>> Stefan >>>>>> >>>>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com >>>>>> >: >>>>>> > >>>>>> > Hi All, >>>>>> > >>>>>> > I am using categoryID as a keyby attribute for creating keyed >>>>>> stream from a product event stream. Keyed stream then creates time >>>>>> windows >>>>>> for each category. However, when the window time expires, i want to write >>>>>> the output data of all the products in all all categories in a single >>>>>> atomic operation collectively. Is there a way to call a single sink >>>>>> function for all the windows with same start and end time. Or is there a >>>>>> way in flink to know that all windows with same end time have finished >>>>>> processing their sink function? >>>>>> > >>>>>> > Currently, each window calls sink function individually. >>>>>> > >>>>>> > cheers, >>>>>> > >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >