Can you store the history in an OrderedListState? This state should allow you to efficiently delete old versions.
On Tue, Apr 20, 2021 at 9:58 AM Raman Gupta <rocketra...@gmail.com> wrote: > I have a running average problem. As I understand it, the traditional Beam > solution is state in a global window, but I'm not quite sure how to > approach it for my use case, which is a bit more complex. > > I have a "score" output every 5 minutes based on a timer, up to a maximum > of 1 hour after some time, depending on the arrival times of a few input > events per day. > > The output of this initial part of the pipeline is 1) versioned, so when > running the pipeline in batch mode, or dealing with up to 3-day late > inputs, the score in the output system is continuously updated (and outputs > from out-of-order inputs are ignored) and 2) aggregated into a daily score > along with inputs coming from other pipeline branches, which is also > continuously updated part-way through the day with early and late triggers. > > Now, I need to calculate the running average of the individual scores > output every 5 minutes multiple times per day, and factor those into the > overall aggregated daily score. The running average should consider only > the highest version score for each day on which there are scores. > > I don't see how I can do this with global windows without keeping a full > history of the latest score and version on every previous day, which will > grow without bound. Or am I missing something? > > Thanks, > Raman > >