*Emit intermediate accumulator(AggregateFunction ACC value) state of a session window when new event arrives*
AggregateFunction#getResults() is called only when window completes. My need is emit intermediate accumulator values(result of AggregateFunction#add()) as well and write them to Sink. Both AggregateFunction#getResult() and ProcessWindowFunction() provides aggregated result, only when the window is closed. *Any thoughts please, how to emit or stream intermediate accumulator state as soon as new event arrive when window is open? Need to implement custom trigger or Assigner?* To give you some background, when user watches a video we get events - when clicked, thereafter every ~ 15minutes, and finally when user close the video. I need to aggregate them as soon as they arrive and post it to destination. For example, if user watching a two-hour movie I get events for 15 min interval(0,15,30,...,120), whenever I get a event need to aggregate watched percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below implementation emitting(getResult()) a single event 20 minutes after watching a video. .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*) .aggregate(new EventAggregator()) .filter(new FinalFilter()) .addSink(...) Appreciate your help. Thanks, chandu