*Emit intermediate accumulator(AggregateFunction ACC value) state of a
session window when new event arrives*



AggregateFunction#getResults() is called only when window completes. My
need is emit intermediate accumulator values(result of
AggregateFunction#add()) as well and write them to Sink. Both
AggregateFunction#getResult() and ProcessWindowFunction() provides
aggregated result, only when the window is closed.

*Any thoughts please, how to emit or stream intermediate accumulator state
as soon as new event arrive when window is open? Need to implement custom
trigger or Assigner?*



To give you some background, when user watches a video we get events - when
clicked, thereafter every ~ 15minutes, and finally when user close the
video.

I need to aggregate them as soon as they arrive and post it to destination.
For example, if user watching a two-hour movie I get events for 15 min
interval(0,15,30,...,120), whenever I get a event need to aggregate watched
percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below
implementation emitting(getResult()) a single event 20 minutes after
watching a video.





.window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)


.aggregate(new EventAggregator())

                                                                .filter(new
FinalFilter())


.addSink(...)


Appreciate your help.


Thanks,

chandu

Reply via email to