Hi Xushuai! Thank you for your reply!
1. Yes, you are absolutely right - we can't fold the records inside output buffer if the current record, which is provided to output, has accumulate type (+I or +U). Only revoke type of records (-U or -D which produced by current TopN function or received by TopN function as input) can cause the folding process inside buffer. Thus accumulate message which was provided to output in previous batch would receive its revoke message in next batch for sure. I added the description of folding rules to the document [1] 2. Absolutely correct, the main purpose of this optimization is to reduce the workload of downstream operators. And it wouldn't increase the performance of current TopN operator. 3. Ok, I'll try to do it after I finish the POC code. [1] https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk <https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing> -- Best regards, Roman Boyko e.: ro.v.bo...@gmail.com m.: +79059592443 telegram: @rboyko On Thu, 28 Mar 2024 at 09:41, shuai xu <xushuai...@gmail.com> wrote: > Hi, Roman > > Thanks for your proposal. I think this is an interesting idea and it might > be useful when there are operators downstream of the TopN. > And I have some questions about your proposal after reading your doc. > > 1. From the input-output perspective, only the accumulated data seems to > be sent. If the accumulated data +recordA has already been sent in the > previous batch, the -recordA would be sent in this batch? Could you provide > a detailed rule about folding redundant records? > > 2. The Minibatch Join[1] reduces state access during join process because > it folds redundant records before entering the process. From your doc, > folding redundant records is implemented after the TopN process. In other > words, it does not reduce the pressure of state access on TopN itself, but > rather just folds the output results that could be redundant. Is it right? > > 3. For the optimization results, the metric of output rows may not be > persuasive. Could you offer a result with metric in nexmark? > > [1] > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins > > Best, > Xushuai > > 2024年3月26日 00:00,Roman Boyko <ro.v.bo...@gmail.com> 写道: > > > > Hi Ron, > > Thank you so much for your reply! > > > > 1. I added the description to Motivation part of my document [1] > > 2. I suppose to inject this functionality to AbstractTopNFunction, thus > it > > will work for all its implementations. It doesn't depend of > implementation > > (either it would be AppendOnlyTopNFunction or RetractableTopNFunction, > > except maybe FastTop1Function), the most effect it would have for > functions > > with: > > - topN functions without no-ranking optimization [2] > > - high value of N (top1 has less possibilities for optimization here) > > - frequent input records which are placed to the top 1 position > > 3. I will do it in a week - I need to fix and recheck some parts > > 4. Unfortunately I don't have permissions to Flink confluence and > according > > to contribution guide I first expressed my idea as google doc. I would be > > happy to transform this idea to FLIP. > > > > [1] > > > https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk > > < > https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing > > > > [2] > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/#no-ranking-output-optimization > > > > -- > > Best regards, > > Roman Boyko > > e.: ro.v.bo...@gmail.com > > m.: +79059592443 > > telegram: @rboyko > > > > On Mon, 25 Mar 2024 at 15:12, Ron liu <ron9....@gmail.com> wrote: > > > >> Hi, Roman > >> > >> Thanks for your proposal, I intuitively feel that this optimization > would > >> be very useful to reduce the amount of message amplification for TopN > >> operators. After briefly looking at your google docs, I have the > following > >> questions: > >> > >> 1. Whether you can describe in detail the principle of solving the TopN > >> operator record amplification, similar to Minibatch Join[1], through the > >> figure of current Motivation part, I can not understand how you did it > >> 2. TopN has currently multiple implementation functions, including > >> AppendOnlyFirstNFunction, AppendOnlyTopNFunction, FastTop1Function, > >> RetractableTopNFunction, UpdatableTopNFunction. Is it possible to > elaborate > >> on which patterns the Minibatch optimization applies to? > >> 3. Is it possible to provide the PoC code? > >> 4. finally, we need a formal FLIP document on the wiki[2]. > >> > >> [1] > >> > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins > >> [2] > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > >> > >> Best, > >> Ron > >> > >> Roman Boyko <ro.v.bo...@gmail.com> 于2024年3月24日周日 01:14写道: > >> > >>> Hi Flink Community, > >>> > >>> I tried to describe my idea about minibatch for TopNFunction in this > doc > >> - > >>> > >>> > >> > https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing > >>> > >>> Looking forward to your feedback, thank you > >>> > >>> On Tue, 19 Mar 2024 at 12:24, Roman Boyko <ro.v.bo...@gmail.com> > wrote: > >>> > >>>> Hello Flink Community, > >>>> > >>>> The same problem with record amplification as described in FLIP-415: > >>> Introduce > >>>> a new join operator to support minibatch[1] exists for most of > >>>> implementations of AbstractTopNFunction. Especially when the rank is > >>>> provided to output. For example, when calculating Top100 with rank > >>> output, > >>>> every input record might produce 100 -U records and 100 +U records. > >>>> > >>>> According to my POC (which is similar to FLIP-415) the record > >>>> amplification could be significantly reduced by using input or output > >>>> buffer. > >>>> > >>>> What do you think if we implement such optimization for TopNFunctions? > >>>> > >>>> [1] > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415 > >>>> %3A+Introduce+a+new+join+operator+to+support+minibatch > >>>> > >>>> -- > >>>> Best regards, > >>>> Roman Boyko > >>>> e.: ro.v.bo...@gmail.com > >>>> m.: +79059592443 > >>>> telegram: @rboyko > >>>> > >>> > >>> > >>> -- > >>> Best regards, > >>> Roman Boyko > >>> e.: ro.v.bo...@gmail.com > >>> m.: +79059592443 > >>> telegram: @rboyko > >>> > >> >