The process function has the signature

void processElement(I value, Context ctx, Collector<O> out) throws Exception
where the context is providing access to the current watermark and you can also 
register timer callbacks, when that trigger when a certain watermark is 
reached. You can simply monitor the watermark through the context for each 
incoming window result. Start time is not important, because you know that you 
have collected the results for all windows with a smaller end time than the 
watermark that you currently see in the context, because this is Flink’s notion 
of completeness. This means you can prepare those windows and aggregate results 
and send them downstream to the sink.

> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.has...@gmail.com>:
> 
> Thanks Stefan, But how the Process function will have these watermarks? I 
> have sliding windows like below
> 
> final DataStream<WindowStats> eventStream = inputStream
>                               .keyBy(TENANT, CATEGORY)
>                               
> .window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5)))
>                               .fold(new WindowStats(), new 
> ProductAggregationMapper(), new ProductAggregationWindowFunction());
> 
> Window results are coming every 5 minutes after first window output. How the 
> process function would know that all the windows for a Tenant have finished 
> for a giving start and end time.
> 
> Thanks for help.
> 
> Cheers,
> 
> On 22 June 2017 at 14:37, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> one possible approach could be that you have a process function before the 
> sink. Process function is aware of watermarks, so it can collect and buffer 
> window results until it sees a watermark. This is the signal that all results 
> for windows with an end time smaller than the watermark are complete. They 
> can then be aggregated and send to the sink.
> 
> Best,
> Stefan
> 
> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.has...@gmail.com 
> > <mailto:ahmad.has...@gmail.com>>:
> >
> > Hi All,
> >
> > I am using categoryID as a keyby attribute for creating keyed stream from a 
> > product event stream. Keyed stream then creates time windows for each 
> > category. However, when the window time expires, i want to write the output 
> > data of all the products in all all categories in a single atomic operation 
> > collectively. Is there a way to call a single sink function for all the 
> > windows with same start and end time. Or is there a way in flink to know 
> > that all windows with same end time have finished processing their sink 
> > function?
> >
> > Currently, each window calls sink function individually.
> >
> > cheers,
> >
> 
> 

Reply via email to