Hi all,

Is there any more feedback I can incorporate before calling a vote?

Thanks,
Alan

On Tue, Jan 28, 2025 at 1:50 PM Alan Sheinberg <asheinb...@confluent.io>
wrote:

> Hi Fabian,
>
> I addressed your comments below.
>
>
>> * BundledKeySegement
>>   * should it be accumulatorS instead of accumulator?
>>   * should accumulators be null or an empty list if no accumulator is
>> present?
>
>
> Good catch, forgot to update those places.  It should be "accumulators"
> and an empty list when there are none. Updated in the FLIP.
>
>>
>> * update the Group By Example to use a list of accumulator instead of a
>> single value
>
>
> Updated.
>
> * fix the `AvgAggregate` example:
>>   * add missing `canRetract()`, and `canMerge()` methods
>>   * example uses `batch()` method instead of `bundledAccumulateRetract()`
>>   * example works with a single accumulator, not a list of accumulators
>
>
> Updated AvgAggregate to address these things.  They should be consistent
> with the interface discussed.
>
> * in general, check the code examples for compliance with the proposed API.
>>   * Some use `bundle()` instead of `bundledAccumulateRetract()`.
>>   * There might be other mistakes that sneaked when evolving the API
>
>
> Yeah, you're right.  I'll read through it again and try to find any
> missing updates.
>
> Thanks,
> Alan
>
> On Tue, Jan 28, 2025 at 3:58 AM Fabian Hüske <fhue...@confluent.io.invalid>
> wrote:
>
>> Thanks Alan for updating the FLIP!
>>
>> IMO, it looks good.
>>
>> Just a few nits that would be great to fix for consistency:
>>
>> * BundledKeySegement
>>   * should it be accumulatorS instead of accumulator?
>>   * should accumulators be null or an empty list if no accumulator is
>> present?
>> * update the Group By Example to use a list of accumulator instead of a
>> single value
>> * fix the `AvgAggregate` example:
>>   * add missing `canRetract()`, and `canMerge()` methods
>>   * example uses `batch()` method instead of `bundledAccumulateRetract()`
>>   * example works with a single accumulator, not a list of accumulators
>> * in general, check the code examples for compliance with the proposed
>> API.
>>   * Some use `bundle()` instead of `bundledAccumulateRetract()`.
>>   * There might be other mistakes that sneaked when evolving the API
>>
>> Thank you,
>> Fabian
>>
>>
>> On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg
>> <asheinb...@confluent.io.invalid> wrote:
>>
>> > Hi everyone,
>> >
>> > Sorry for the delayed response!  I appreciate the comments.
>> >
>> > Addressing Timo's comments:
>> >
>> > > Correct me if I'm wrong, but for AggregateFunctions implementing a
>> > > retract() and merge() is optional. How can a BundledAggregateFunction
>> > > communicate whether or not this is supported to the planner? Enforcing
>> > > the retract() feature in the interface specification could be an
>> option,
>> > > but esp for window aggregations there might not be a retract required.
>> >
>> >
>> > This is a good catch.  Much of my experimenting with a POC was done
>> with a
>> > retract call and it slipped my mind that it's optional.  I think this
>> will
>> > have to be added to the interface  BundledAggregateFunction.
>> >
>> > Also how do you plan to support merge() in this design? I couldn't find
>> > > any mentioning in the FLIP.
>> > >
>> > Searching through operators which used merge, I wasn't clear that I
>> would
>> > require it in the implementation, so I didn't think it required
>> support.  I
>> > now see it's used in windows and maybe elsewhere.  I'll add a list of
>> > accumulators rather than a single one -- the first step will be to merge
>> > accumulators before applying any of the accumulate or retract calls. I
>> need
>> > to look more closely at the operators that will use them, but think it
>> may
>> > make sense to do in this way.  Tell me if you feel strongly that they
>> > should be separate method calls.
>> >
>> > Addressing Fabian's comments:
>> >
>> > > * Why do we need the `canBundle()` function? We can use the interface
>> > > itself as a marker. A function that can't bundle, shouldn't implement
>> the
>> > > interface.
>> > >   * the interface could just contain the
>> > > `bundledAccumulateRetract(List<BKS> bundle)` method?
>> >
>> >
>> > I originally had it just like you're recommending, but it removes a
>> layer
>> > of indirection and removes some flexibility because it means that a
>> class
>> > must statically decide if it's an aggregate bundled function or not.
>> > FunctionDefinition doesn't work this way -- for example at planning time
>> > you can determine what function kind you are rather than by directly
>> > extending ScalarFunction, for example.  This flexibility allows us to
>> > implement a meta UDF which can try to be any kind of UDFs and then
>> > specialize during runtime.  For example, this would allow it only to
>> return
>> > canBundle when it's an aggregate which wants to implement the bundled
>> > interface.
>> >
>> > * why does a function call need to handle multiple keys?
>> > >   * It feels that the key-loop is something that 90% of the functions
>> > would
>> > > implement the same.
>> > >   * Couldn't we have a default implementation for the key loop and
>> > another
>> > > method that just handles the bundle of a single key?
>> >
>> >
>> > I think so.  So you mean you could overload multiple methods, one for
>> > single key and another for multi key and the default for the latter
>> would
>> > just call the former N times?  That would be fine.  In some cases, you
>> > would want to override the multi key version, to minimize external
>> calls,
>> > for example, but others might not.
>> >
>> > * does the list of updatedValuesAfterEachRow also include the last row?
>> > >   * it would be good to clarify that in the API documentation
>> >
>> >
>> > It includes the first and last value (and everything in between), so
>> > duplicates those fields also set in startingValue and finalValue.  I
>> can be
>> > more explicit about this field.
>> >
>> > * Could you clarify what the table.exec.agg-bundled.size parameter
>> refers
>> > > to?
>> > >   * Number of keys? Total number of rows? Number of rows per key?
>> >
>> >
>> > Number of total input rows which may be for one or more keys.  Let me
>> > clarify that.
>> >
>> > * It would be nice to add the keys in the example
>> >
>> > You're right.  It's a bit confusing without them.  I'll add them.
>> >
>> > * How do the bundled agg functions interact with checkpointing?
>> > >   * Are bundles sent out when a checkpoint barrier is received?
>> >
>> >
>> > Yes, for group by, it would flush the bundle when a checkpoint barrier
>> is
>> > received.  I think we would ultimately want to do this async so as not
>> to
>> > block the checkpoint (in case the bundle takes a while), but the first
>> > implementation will likely be straightforward.  I can make a followup
>> task
>> > for this.
>> >
>> > For the over implementations, processElement uses ValueState and
>> > mapstate to store all the intermediate data and uses watermark
>> eventTimers
>> > to actually do the bundle calls.  I think for these, there is no special
>> > behavior for checkpoints and shouldn't hold them up at all.
>> >
>> > For the windowed aggregate implementations, it would also flush the
>> bundle
>> > when a checkpoint barrier is received. I think this will also ultimately
>> > need to be made async as well, but will have a first implementation
>> which
>> > happens when the barrier is received.
>> >
>> > I'll update the FLIP to address the comments above and please have
>> another
>> > look.
>> >
>> > Thanks,
>> > Alan
>> >
>> >
>> > On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske
>> <fhue...@confluent.io.invalid
>> > >
>> > wrote:
>> >
>> > > Hi Alan,
>> > >
>> > > Thanks for working on this FLIP!
>> > > Overall the document looks very good, but I have a few question /
>> > comments:
>> > >
>> > > * Why do we need the `canBundle()` function? We can use the interface
>> > > itself as a marker. A function that can't bundle, shouldn't implement
>> the
>> > > interface.
>> > >   * the interface could just contain the
>> > > `bundledAccumulateRetract(List<BKS> bundle)` method?
>> > >
>> > > * why does a function call need to handle multiple keys?
>> > >   * It feels that the key-loop is something that 90% of the functions
>> > would
>> > > implement the same.
>> > >   * Couldn't we have a default implementation for the key loop and
>> > another
>> > > method that just handles the bundle of a single key?
>> > >
>> > > * does the list of updatedValuesAfterEachRow also include the last
>> row?
>> > >   * it would be good to clarify that in the API documentation
>> > >
>> > > * Could you clarify what the table.exec.agg-bundled.size parameter
>> refers
>> > > to?
>> > >   * Number of keys? Total number of rows? Number of rows per key?
>> > >
>> > > * It would be nice to add the keys in the example
>> > >
>> > > * How do the bundled agg functions interact with checkpointing?
>> > >   * Are bundles sent out when a checkpoint barrier is received?
>> > >
>> > > Thank you, Fabian
>> > >
>> > > On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org>
>> wrote:
>> > >
>> > > > Hi Alan,
>> > > >
>> > > > thanks for sharing this FLIP with us. Sorry for the late reply. I
>> think
>> > > > the FLIP is already in a very good shape.
>> > > >
>> > > > I like the approach that the BundledAggregateFunction is rather a
>> > > > separate interface that can be implemented by advanced users. This
>> > > > matches with the existing SpecializedFunction interface and the
>> > upcoming
>> > > > ChangelogFunction of FLIP-440.
>> > > >
>> > > > However, I have some additional feedback:
>> > > >
>> > > > Correct me if I'm wrong, but for AggregateFunctions implementing a
>> > > > retract() and merge() is optional. How can a
>> BundledAggregateFunction
>> > > > communicate whether or not this is supported to the planner?
>> Enforcing
>> > > > the retract() feature in the interface specification could be an
>> > option,
>> > > > but esp for window aggregations there might not be a retract
>> required.
>> > > >
>> > > > Also how do you plan to support merge() in this design? I couldn't
>> find
>> > > > any mentioning in the FLIP.
>> > > >
>> > > > Regards,
>> > > > Timo
>> > > >
>> > > >
>> > > >
>> > > > On 12.12.24 02:57, Alan Sheinberg wrote:
>> > > > > I'd like to start a discussion of FLIP-491:
>> BundledAggregateFunction
>> > > for
>> > > > > batched aggregation [1]
>> > > > >
>> > > > > This feature proposes adding a new interface
>> BundledAggregateFunction
>> > > > that
>> > > > > can be implemented by AggregateFunction UDFs.  This allows the use
>> > of a
>> > > > > batched method call so that users can handle many rows at a time
>> for
>> > > > > multiple keys rather than the per-row calls such as accumulate and
>> > > > retract.
>> > > > >
>> > > > > The purpose is to achieve high throughput while still allowing for
>> > > calls
>> > > > to
>> > > > > external systems or other blocking operations.  Similar calls
>> through
>> > > the
>> > > > > conventional AggregateFunction methods would be prohibitively
>> slow,
>> > but
>> > > > if
>> > > > > given a batch of inputs and accumulators for each key, the
>> > implementer
>> > > > has
>> > > > > the power to parallelize or internally batch lookups to improve
>> > > > performance.
>> > > > >
>> > > > > Looking forward to your feedback and suggestions.
>> > > > >
>> > > > > [1]
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation
>> > > > >
>> > > > >
>> > > > > Thanks,
>> > > > > Alan
>> > > > >
>> > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to