Hi Jark, +1 for this proposal. Glad to see more details : )
Sincerely, Weike On Mon, Jan 25, 2021 at 2:03 PM Jark Wu <imj...@gmail.com> wrote: > Hi all, > > I would like to propose introducing a new method "retractAccumulators()" to > the `AggregateFunction` in Table/SQL. > > *Motivation* > > The motivation is to improve the performance of hopping (sliding) windows. > Currently, we have paned (or called sliced) optimization for the hopping > windows in Table/SQL. > That each element will only be accumulated into a single pane. And once a > window is fired, > we will merge multiple panes to get the window result. > > For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes > [0, 2), [2, 4), [4, 6), [6, 8), [8, 10). > And each element will fall into a single pane, e.g. element with timestamp > 3 will fall into pane [2, 4). > > However, currently, the merging panes happen on JVM heap memory. For > example, when window [0, 10) is going to be fired, > we will retrieve the accumulators of the 5 panes and merge them into an > in-memory accumulator. > The performance is not good, because the number of panes may be very large > when the slide is small, e.g. 8640 panes when HOP(1day, 10s). > And the memory may OOM when the accumulator is very large, e.g. count > distinct. > > Thus, I would like to introduce a "retractAccumulators()" method which is > an inverse method of "merge()". > With the "retractAccumulators()" method, we can reduce the time complexity > from O(N) to O(1). > For example, when window [10, 20) is going to be fired, then we only need > to retract accumulator of pane [8, 10) > and merge the accumulator of pane [18, 20) into the state of the last > window [8, 18). > > This will be a great performance improvement to make the hopping window > have similar performance > with the tumbling window, no matter how small the slide is. > And we can avoid OOM, because the merged acc is on state instead of > in-memory. > > *Public Interface* > > We will introduce a contract method "retractAccumulators" which is similar > to the "merge" method. > > Retracts a group of accumulator instances from one accumulator instance. > This method is optional, > but implementing this method can greatly improve the performance of hopping > window aggregates. > Therefore, it is recommended to implement this method when using with > hopping windows. > > param: accumulator the accumulator which will keep the retracted aggregate > results. It should > be noted that the accumulator may contain the previous > aggregated > results. Therefore users should not replace or clean > this instance in the > custom retractAccumulators method. > param: retractAccs an java.lang.Iterable pointed to a group of accumulators > that will be > retracted. > > public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC> > retractAccs) > > > What do you think? > > Best, > Jark >