Finally I published the POC of the minibatch for TopN function [1]. It covers all the implementations of TopN functions because it buffers the records before putting them to the collector inside AbstractTopNFunction. For proving the performance optimization I used the nexmark q19 which was enhanced by adding INNER JOIN after the TopN function. Using only 1000 records output buffer gives nice results: the number of output records of TopN function decreased from 490.000 to 136.000 records and job execution time decreased 3 times. The details of the experiment and result measurements can be found in document [2] (under Nexmark subtitle)
Looking forward to your feedback. [1] https://github.com/rovboyko/flink/tree/feature/topn-output-buffer [2] https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk -- Best regards, Roman Boyko e.: ro.v.bo...@gmail.com m.: +79059592443 telegram: @rboyko On Thu, 28 Mar 2024 at 06:54, Roman Boyko <ro.v.bo...@gmail.com> wrote: > Hi Xushuai! > > Thank you for your reply! > > 1. Yes, you are absolutely right - we can't fold the records inside output > buffer if the current record, which is provided to output, has accumulate > type (+I or +U). Only revoke type of records (-U or -D which produced by > current TopN function or received by TopN function as input) can cause the > folding process inside buffer. Thus accumulate message which was provided > to output in previous batch would receive its revoke message in next > batch for sure. I added the description of folding rules to the document [1] > 2. Absolutely correct, the main purpose of this optimization is to reduce > the workload of downstream operators. And it wouldn't increase the > performance of current TopN operator. > 3. Ok, I'll try to do it after I finish the POC code. > > [1] > > https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk > <https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing> > > > -- > Best regards, > Roman Boyko > e.: ro.v.bo...@gmail.com > m.: +79059592443 > telegram: @rboyko > > On Thu, 28 Mar 2024 at 09:41, shuai xu <xushuai...@gmail.com> wrote: > >> Hi, Roman >> >> Thanks for your proposal. I think this is an interesting idea and it >> might be useful when there are operators downstream of the TopN. >> And I have some questions about your proposal after reading your doc. >> >> 1. From the input-output perspective, only the accumulated data seems to >> be sent. If the accumulated data +recordA has already been sent in the >> previous batch, the -recordA would be sent in this batch? Could you provide >> a detailed rule about folding redundant records? >> >> 2. The Minibatch Join[1] reduces state access during join process because >> it folds redundant records before entering the process. From your doc, >> folding redundant records is implemented after the TopN process. In other >> words, it does not reduce the pressure of state access on TopN itself, but >> rather just folds the output results that could be redundant. Is it right? >> >> 3. For the optimization results, the metric of output rows may not be >> persuasive. Could you offer a result with metric in nexmark? >> >> [1] >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins >> >> Best, >> Xushuai >> > 2024年3月26日 00:00,Roman Boyko <ro.v.bo...@gmail.com> 写道: >> > >> > Hi Ron, >> > Thank you so much for your reply! >> > >> > 1. I added the description to Motivation part of my document [1] >> > 2. I suppose to inject this functionality to AbstractTopNFunction, thus >> it >> > will work for all its implementations. It doesn't depend of >> implementation >> > (either it would be AppendOnlyTopNFunction or RetractableTopNFunction, >> > except maybe FastTop1Function), the most effect it would have for >> functions >> > with: >> > - topN functions without no-ranking optimization [2] >> > - high value of N (top1 has less possibilities for optimization >> here) >> > - frequent input records which are placed to the top 1 position >> > 3. I will do it in a week - I need to fix and recheck some parts >> > 4. Unfortunately I don't have permissions to Flink confluence and >> according >> > to contribution guide I first expressed my idea as google doc. I would >> be >> > happy to transform this idea to FLIP. >> > >> > [1] >> > >> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk >> > < >> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing >> > >> > [2] >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/#no-ranking-output-optimization >> > >> > -- >> > Best regards, >> > Roman Boyko >> > e.: ro.v.bo...@gmail.com >> > m.: +79059592443 >> > telegram: @rboyko >> > >> > On Mon, 25 Mar 2024 at 15:12, Ron liu <ron9....@gmail.com> wrote: >> > >> >> Hi, Roman >> >> >> >> Thanks for your proposal, I intuitively feel that this optimization >> would >> >> be very useful to reduce the amount of message amplification for TopN >> >> operators. After briefly looking at your google docs, I have the >> following >> >> questions: >> >> >> >> 1. Whether you can describe in detail the principle of solving the TopN >> >> operator record amplification, similar to Minibatch Join[1], through >> the >> >> figure of current Motivation part, I can not understand how you did it >> >> 2. TopN has currently multiple implementation functions, including >> >> AppendOnlyFirstNFunction, AppendOnlyTopNFunction, FastTop1Function, >> >> RetractableTopNFunction, UpdatableTopNFunction. Is it possible to >> elaborate >> >> on which patterns the Minibatch optimization applies to? >> >> 3. Is it possible to provide the PoC code? >> >> 4. finally, we need a formal FLIP document on the wiki[2]. >> >> >> >> [1] >> >> >> >> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins >> >> [2] >> >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >> >> >> >> Best, >> >> Ron >> >> >> >> Roman Boyko <ro.v.bo...@gmail.com> 于2024年3月24日周日 01:14写道: >> >> >> >>> Hi Flink Community, >> >>> >> >>> I tried to describe my idea about minibatch for TopNFunction in this >> doc >> >> - >> >>> >> >>> >> >> >> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing >> >>> >> >>> Looking forward to your feedback, thank you >> >>> >> >>> On Tue, 19 Mar 2024 at 12:24, Roman Boyko <ro.v.bo...@gmail.com> >> wrote: >> >>> >> >>>> Hello Flink Community, >> >>>> >> >>>> The same problem with record amplification as described in FLIP-415: >> >>> Introduce >> >>>> a new join operator to support minibatch[1] exists for most of >> >>>> implementations of AbstractTopNFunction. Especially when the rank is >> >>>> provided to output. For example, when calculating Top100 with rank >> >>> output, >> >>>> every input record might produce 100 -U records and 100 +U records. >> >>>> >> >>>> According to my POC (which is similar to FLIP-415) the record >> >>>> amplification could be significantly reduced by using input or output >> >>>> buffer. >> >>>> >> >>>> What do you think if we implement such optimization for >> TopNFunctions? >> >>>> >> >>>> [1] >> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415 >> >>>> %3A+Introduce+a+new+join+operator+to+support+minibatch >> >>>> >> >>>> -- >> >>>> Best regards, >> >>>> Roman Boyko >> >>>> e.: ro.v.bo...@gmail.com >> >>>> m.: +79059592443 >> >>>> telegram: @rboyko >> >>>> >> >>> >> >>> >> >>> -- >> >>> Best regards, >> >>> Roman Boyko >> >>> e.: ro.v.bo...@gmail.com >> >>> m.: +79059592443 >> >>> telegram: @rboyko >> >>> >> >> >> >