Finally I published the POC of the minibatch for TopN function [1]. It
covers all the implementations of TopN functions because it buffers the
records before putting them to the collector inside AbstractTopNFunction.
For proving the performance optimization I used the nexmark q19 which was
enhanced by adding INNER JOIN after the TopN function. Using only 1000
records output buffer gives nice results: the number of output records of
TopN function decreased from 490.000 to 136.000 records and job execution
time decreased 3 times. The details of the experiment and result
measurements can be found in document [2] (under Nexmark subtitle)

Looking forward to your feedback.

[1]
https://github.com/rovboyko/flink/tree/feature/topn-output-buffer
[2]
https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk

--
Best regards,
Roman Boyko
e.: ro.v.bo...@gmail.com
m.: +79059592443
telegram: @rboyko

On Thu, 28 Mar 2024 at 06:54, Roman Boyko <ro.v.bo...@gmail.com> wrote:

> Hi  Xushuai!
>
> Thank you for your reply!
>
> 1. Yes, you are absolutely right - we can't fold the records inside output
> buffer if the current record, which is provided to output, has accumulate
> type (+I or +U). Only revoke type of records (-U or -D which produced by
> current TopN function or received by TopN function as input) can cause the
> folding process inside buffer. Thus accumulate message which was provided
> to output in previous batch would receive its revoke message in next
> batch for sure. I added the description of folding rules to the document [1]
> 2. Absolutely correct, the main purpose of this optimization is to reduce
> the workload of downstream operators. And it wouldn't increase the
> performance of current TopN operator.
> 3. Ok, I'll try to do it after I finish the POC code.
>
> [1]
>
> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk
> <https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing>
>
>
> --
> Best regards,
> Roman Boyko
> e.: ro.v.bo...@gmail.com
> m.: +79059592443
> telegram: @rboyko
>
> On Thu, 28 Mar 2024 at 09:41, shuai xu <xushuai...@gmail.com> wrote:
>
>> Hi, Roman
>>
>> Thanks for your proposal. I think this is an interesting idea and it
>> might be useful when there are operators downstream of the TopN.
>> And I have some questions about your proposal after reading your doc.
>>
>> 1.  From the input-output perspective, only the accumulated data seems to
>> be sent. If the accumulated data  +recordA has already been sent in the
>> previous batch, the -recordA would be sent in this batch? Could you provide
>> a detailed rule about folding redundant records?
>>
>> 2. The Minibatch Join[1] reduces state access during join process because
>> it folds redundant records before entering the process. From your doc,
>> folding redundant records is implemented after the TopN process. In other
>> words, it does not reduce the pressure of state access on TopN itself, but
>> rather just folds the output results that could be redundant. Is it right?
>>
>> 3. For the optimization results, the metric of output rows may not be
>> persuasive. Could you offer a result with metric in nexmark?
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins
>>
>> Best,
>> Xushuai
>> > 2024年3月26日 00:00,Roman Boyko <ro.v.bo...@gmail.com> 写道:
>> >
>> > Hi Ron,
>> > Thank you so much for your reply!
>> >
>> > 1. I added the description to Motivation part of my document [1]
>> > 2. I suppose to inject this functionality to AbstractTopNFunction, thus
>> it
>> > will work for all its implementations. It doesn't depend of
>> implementation
>> > (either it would be AppendOnlyTopNFunction or RetractableTopNFunction,
>> > except maybe FastTop1Function), the most effect it would have for
>> functions
>> > with:
>> >     - topN functions without no-ranking optimization [2]
>> >     - high value of N (top1 has less possibilities for optimization
>> here)
>> >     - frequent input records which are placed to the top 1 position
>> > 3. I will do it in a week - I need to fix and recheck some parts
>> > 4. Unfortunately I don't have permissions to Flink confluence and
>> according
>> > to contribution guide I first expressed my idea as google doc. I would
>> be
>> > happy to transform this idea to FLIP.
>> >
>> > [1]
>> >
>> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk
>> > <
>> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing
>> >
>> > [2]
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/#no-ranking-output-optimization
>> >
>> > --
>> > Best regards,
>> > Roman Boyko
>> > e.: ro.v.bo...@gmail.com
>> > m.: +79059592443
>> > telegram: @rboyko
>> >
>> > On Mon, 25 Mar 2024 at 15:12, Ron liu <ron9....@gmail.com> wrote:
>> >
>> >> Hi, Roman
>> >>
>> >> Thanks for your proposal, I intuitively feel that this optimization
>> would
>> >> be very useful to reduce the amount of message amplification for TopN
>> >> operators. After briefly looking at your google docs, I have the
>> following
>> >> questions:
>> >>
>> >> 1. Whether you can describe in detail the principle of solving the TopN
>> >> operator record amplification, similar to Minibatch Join[1], through
>> the
>> >> figure of current Motivation part, I can not understand how you did it
>> >> 2. TopN has currently multiple implementation functions, including
>> >> AppendOnlyFirstNFunction, AppendOnlyTopNFunction, FastTop1Function,
>> >> RetractableTopNFunction, UpdatableTopNFunction. Is it possible to
>> elaborate
>> >> on which patterns the Minibatch optimization applies to?
>> >> 3. Is it possible to provide the PoC code?
>> >> 4. finally, we need a formal FLIP document on the wiki[2].
>> >>
>> >> [1]
>> >>
>> >>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-regular-joins
>> >> [2]
>> >>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> >>
>> >> Best,
>> >> Ron
>> >>
>> >> Roman Boyko <ro.v.bo...@gmail.com> 于2024年3月24日周日 01:14写道:
>> >>
>> >>> Hi Flink Community,
>> >>>
>> >>> I tried to describe my idea about minibatch for TopNFunction in this
>> doc
>> >> -
>> >>>
>> >>>
>> >>
>> https://docs.google.com/document/d/1YPHwxKfiGSUOUOa6bc68fIJHO_UojTwZEC29VVEa-Uk/edit?usp=sharing
>> >>>
>> >>> Looking forward to your feedback, thank you
>> >>>
>> >>> On Tue, 19 Mar 2024 at 12:24, Roman Boyko <ro.v.bo...@gmail.com>
>> wrote:
>> >>>
>> >>>> Hello Flink Community,
>> >>>>
>> >>>> The same problem with record amplification as described in FLIP-415:
>> >>> Introduce
>> >>>> a new join operator to support minibatch[1] exists for most of
>> >>>> implementations of AbstractTopNFunction. Especially when the rank is
>> >>>> provided to output. For example, when calculating Top100 with rank
>> >>> output,
>> >>>> every input record might produce 100 -U records and 100 +U records.
>> >>>>
>> >>>> According to my POC (which is similar to FLIP-415) the record
>> >>>> amplification could be significantly reduced by using input or output
>> >>>> buffer.
>> >>>>
>> >>>> What do you think if we implement such optimization for
>> TopNFunctions?
>> >>>>
>> >>>> [1]
>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-415
>> >>>> %3A+Introduce+a+new+join+operator+to+support+minibatch
>> >>>>
>> >>>> --
>> >>>> Best regards,
>> >>>> Roman Boyko
>> >>>> e.: ro.v.bo...@gmail.com
>> >>>> m.: +79059592443
>> >>>> telegram: @rboyko
>> >>>>
>> >>>
>> >>>
>> >>> --
>> >>> Best regards,
>> >>> Roman Boyko
>> >>> e.: ro.v.bo...@gmail.com
>> >>> m.: +79059592443
>> >>> telegram: @rboyko
>> >>>
>> >>
>>
>

Reply via email to