Hi Ahmad, Flink's watermark mechanism guarantees that when you receive a watermark for time t all records with a timestamp smaller than t have been received as well. Records emitted from a window have the timestamp of their end time. So, the ProcessFunction receives a timestamp for 12:00:00 you can be sure that you also received all records for windows that closed before 12:00:00. The function should buffer all records it receives between watermarks as state and once it receives a watermark (triggering of a registered event-time timer) it should write the buffered records out.
Btw. this only works for event time windows but not for processing time. Cheers, Fabian 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.has...@gmail.com>: > Hi Stefan, > > How process function would know that the last window result has arrived? > Because slidingwindows slide every 5 minutes which means that window of new > time-range or new watermark will arrive after 5 minutes. > > Thanks > > > On 22 June 2017 at 15:10, Stefan Richter <s.rich...@data-artisans.com> > wrote: > >> The process function has the signature >> >> void processElement(I value, Context ctx, Collector<O> out) throws Exception >> >> where the context is providing access to the current watermark and you >> can also register timer callbacks, when that trigger when a certain >> watermark is reached. You can simply monitor the watermark through the >> context for each incoming window result. *Start* time is not important, >> because you know that you have collected the results for all windows with a >> smaller *end* time than the watermark that you currently see in the >> context, because this is Flinkās notion of completeness. This means you can >> prepare those windows and aggregate results and send them downstream to the >> sink. >> >> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>: >> >> Thanks Stefan, But how the Process function will have these watermarks? >> I have sliding windows like below >> >> final DataStream<WindowStats> eventStream = inputStream >> .keyBy(TENANT, CATEGORY) >> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5 >> ))) >> .fold(new WindowStats(), new ProductAggregationMapper(), new >> ProductAggregationWindowFunction()); >> >> Window results are coming every 5 minutes after first window output. How >> the process function would know that all the windows for a Tenant have >> finished for a giving start and end time. >> >> Thanks for help. >> >> Cheers, >> >> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> one possible approach could be that you have a process function before >>> the sink. Process function is aware of watermarks, so it can collect and >>> buffer window results until it sees a watermark. This is the signal that >>> all results for windows with an end time smaller than the watermark are >>> complete. They can then be aggregated and send to the sink. >>> >>> Best, >>> Stefan >>> >>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com>: >>> > >>> > Hi All, >>> > >>> > I am using categoryID as a keyby attribute for creating keyed stream >>> from a product event stream. Keyed stream then creates time windows for >>> each category. However, when the window time expires, i want to write the >>> output data of all the products in all all categories in a single atomic >>> operation collectively. Is there a way to call a single sink function for >>> all the windows with same start and end time. Or is there a way in flink to >>> know that all windows with same end time have finished processing their >>> sink function? >>> > >>> > Currently, each window calls sink function individually. >>> > >>> > cheers, >>> > >>> >>> >> >> >