Hi, That looks perfect! I realized I could probably use an Evictor together with my WindowProcessFunction to prevent the window from preserving the whole state, but ditching the window looks even better.
Thanks a lot! William ----- Original Message ----- From: "Nico Kruber" <n...@data-artisans.com> To:<user@flink.apache.org> Cc:"William Saar" <will...@saar.se> Sent:Tue, 20 Jun 2017 18:20:01 +0200 Subject:Re: Access to time in aggregation, or aggregation in ProcessWindowFunction? Hi William, I'm not quite sure what you are trying to achieve... What constitutes a "new event"? is this based on some key? If so, you may group on that key, create a window and use a custom trigger [1] instead where you can react in onElement() and setup a event time timer for the first one and then react in onEventTime for your timeout. A ProcessFunction [2] (without a window) looks like a better solution though depending on the details. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ windows.html#triggers [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/ process_function.html On Tuesday, 20 June 2017 12:52:38 CEST William Saar wrote: > Hi, > I am looking to implement a window that sends out updates for each new > event it receives and also when an expiration timer fires and purges > the window (the expiration time can be determined from a timestamp in > the first event). > > I can't figure out a way to do this that does not require preserving > all events in the window. It seems I would either need to be able to > check the current watermark when an aggregation or its window function > is evaluated to be able to fire the final update when the timer fires, > or I would need the WindowProcessFunction (where I do have access to > the time) to not preserve all elements in the window. > > The only way I've come up with to implement this is to use a > WindowProcessFunction that keeps state to only send out updates for > new elements in the elements iterable. The WindowProcessFunction then > also sends out an update when the first element timestamp meets the > expiration condition, or if the elements iterable parameter does not > contain any new elements (deducing that the processing must have been > triggered by a timer invocation and not a new element). Is there a > better way to do this? > > Thanks, > William