Hi all, Is there any more feedback I can incorporate before calling a vote?
Thanks, Alan On Tue, Jan 28, 2025 at 1:50 PM Alan Sheinberg <asheinb...@confluent.io> wrote: > Hi Fabian, > > I addressed your comments below. > > >> * BundledKeySegement >> * should it be accumulatorS instead of accumulator? >> * should accumulators be null or an empty list if no accumulator is >> present? > > > Good catch, forgot to update those places. It should be "accumulators" > and an empty list when there are none. Updated in the FLIP. > >> >> * update the Group By Example to use a list of accumulator instead of a >> single value > > > Updated. > > * fix the `AvgAggregate` example: >> * add missing `canRetract()`, and `canMerge()` methods >> * example uses `batch()` method instead of `bundledAccumulateRetract()` >> * example works with a single accumulator, not a list of accumulators > > > Updated AvgAggregate to address these things. They should be consistent > with the interface discussed. > > * in general, check the code examples for compliance with the proposed API. >> * Some use `bundle()` instead of `bundledAccumulateRetract()`. >> * There might be other mistakes that sneaked when evolving the API > > > Yeah, you're right. I'll read through it again and try to find any > missing updates. > > Thanks, > Alan > > On Tue, Jan 28, 2025 at 3:58 AM Fabian Hüske <fhue...@confluent.io.invalid> > wrote: > >> Thanks Alan for updating the FLIP! >> >> IMO, it looks good. >> >> Just a few nits that would be great to fix for consistency: >> >> * BundledKeySegement >> * should it be accumulatorS instead of accumulator? >> * should accumulators be null or an empty list if no accumulator is >> present? >> * update the Group By Example to use a list of accumulator instead of a >> single value >> * fix the `AvgAggregate` example: >> * add missing `canRetract()`, and `canMerge()` methods >> * example uses `batch()` method instead of `bundledAccumulateRetract()` >> * example works with a single accumulator, not a list of accumulators >> * in general, check the code examples for compliance with the proposed >> API. >> * Some use `bundle()` instead of `bundledAccumulateRetract()`. >> * There might be other mistakes that sneaked when evolving the API >> >> Thank you, >> Fabian >> >> >> On Tue, Jan 28, 2025 at 12:54 AM Alan Sheinberg >> <asheinb...@confluent.io.invalid> wrote: >> >> > Hi everyone, >> > >> > Sorry for the delayed response! I appreciate the comments. >> > >> > Addressing Timo's comments: >> > >> > > Correct me if I'm wrong, but for AggregateFunctions implementing a >> > > retract() and merge() is optional. How can a BundledAggregateFunction >> > > communicate whether or not this is supported to the planner? Enforcing >> > > the retract() feature in the interface specification could be an >> option, >> > > but esp for window aggregations there might not be a retract required. >> > >> > >> > This is a good catch. Much of my experimenting with a POC was done >> with a >> > retract call and it slipped my mind that it's optional. I think this >> will >> > have to be added to the interface BundledAggregateFunction. >> > >> > Also how do you plan to support merge() in this design? I couldn't find >> > > any mentioning in the FLIP. >> > > >> > Searching through operators which used merge, I wasn't clear that I >> would >> > require it in the implementation, so I didn't think it required >> support. I >> > now see it's used in windows and maybe elsewhere. I'll add a list of >> > accumulators rather than a single one -- the first step will be to merge >> > accumulators before applying any of the accumulate or retract calls. I >> need >> > to look more closely at the operators that will use them, but think it >> may >> > make sense to do in this way. Tell me if you feel strongly that they >> > should be separate method calls. >> > >> > Addressing Fabian's comments: >> > >> > > * Why do we need the `canBundle()` function? We can use the interface >> > > itself as a marker. A function that can't bundle, shouldn't implement >> the >> > > interface. >> > > * the interface could just contain the >> > > `bundledAccumulateRetract(List<BKS> bundle)` method? >> > >> > >> > I originally had it just like you're recommending, but it removes a >> layer >> > of indirection and removes some flexibility because it means that a >> class >> > must statically decide if it's an aggregate bundled function or not. >> > FunctionDefinition doesn't work this way -- for example at planning time >> > you can determine what function kind you are rather than by directly >> > extending ScalarFunction, for example. This flexibility allows us to >> > implement a meta UDF which can try to be any kind of UDFs and then >> > specialize during runtime. For example, this would allow it only to >> return >> > canBundle when it's an aggregate which wants to implement the bundled >> > interface. >> > >> > * why does a function call need to handle multiple keys? >> > > * It feels that the key-loop is something that 90% of the functions >> > would >> > > implement the same. >> > > * Couldn't we have a default implementation for the key loop and >> > another >> > > method that just handles the bundle of a single key? >> > >> > >> > I think so. So you mean you could overload multiple methods, one for >> > single key and another for multi key and the default for the latter >> would >> > just call the former N times? That would be fine. In some cases, you >> > would want to override the multi key version, to minimize external >> calls, >> > for example, but others might not. >> > >> > * does the list of updatedValuesAfterEachRow also include the last row? >> > > * it would be good to clarify that in the API documentation >> > >> > >> > It includes the first and last value (and everything in between), so >> > duplicates those fields also set in startingValue and finalValue. I >> can be >> > more explicit about this field. >> > >> > * Could you clarify what the table.exec.agg-bundled.size parameter >> refers >> > > to? >> > > * Number of keys? Total number of rows? Number of rows per key? >> > >> > >> > Number of total input rows which may be for one or more keys. Let me >> > clarify that. >> > >> > * It would be nice to add the keys in the example >> > >> > You're right. It's a bit confusing without them. I'll add them. >> > >> > * How do the bundled agg functions interact with checkpointing? >> > > * Are bundles sent out when a checkpoint barrier is received? >> > >> > >> > Yes, for group by, it would flush the bundle when a checkpoint barrier >> is >> > received. I think we would ultimately want to do this async so as not >> to >> > block the checkpoint (in case the bundle takes a while), but the first >> > implementation will likely be straightforward. I can make a followup >> task >> > for this. >> > >> > For the over implementations, processElement uses ValueState and >> > mapstate to store all the intermediate data and uses watermark >> eventTimers >> > to actually do the bundle calls. I think for these, there is no special >> > behavior for checkpoints and shouldn't hold them up at all. >> > >> > For the windowed aggregate implementations, it would also flush the >> bundle >> > when a checkpoint barrier is received. I think this will also ultimately >> > need to be made async as well, but will have a first implementation >> which >> > happens when the barrier is received. >> > >> > I'll update the FLIP to address the comments above and please have >> another >> > look. >> > >> > Thanks, >> > Alan >> > >> > >> > On Wed, Jan 15, 2025 at 9:06 AM Fabian Hüske >> <fhue...@confluent.io.invalid >> > > >> > wrote: >> > >> > > Hi Alan, >> > > >> > > Thanks for working on this FLIP! >> > > Overall the document looks very good, but I have a few question / >> > comments: >> > > >> > > * Why do we need the `canBundle()` function? We can use the interface >> > > itself as a marker. A function that can't bundle, shouldn't implement >> the >> > > interface. >> > > * the interface could just contain the >> > > `bundledAccumulateRetract(List<BKS> bundle)` method? >> > > >> > > * why does a function call need to handle multiple keys? >> > > * It feels that the key-loop is something that 90% of the functions >> > would >> > > implement the same. >> > > * Couldn't we have a default implementation for the key loop and >> > another >> > > method that just handles the bundle of a single key? >> > > >> > > * does the list of updatedValuesAfterEachRow also include the last >> row? >> > > * it would be good to clarify that in the API documentation >> > > >> > > * Could you clarify what the table.exec.agg-bundled.size parameter >> refers >> > > to? >> > > * Number of keys? Total number of rows? Number of rows per key? >> > > >> > > * It would be nice to add the keys in the example >> > > >> > > * How do the bundled agg functions interact with checkpointing? >> > > * Are bundles sent out when a checkpoint barrier is received? >> > > >> > > Thank you, Fabian >> > > >> > > On Mon, Jan 13, 2025 at 3:42 PM Timo Walther <twal...@apache.org> >> wrote: >> > > >> > > > Hi Alan, >> > > > >> > > > thanks for sharing this FLIP with us. Sorry for the late reply. I >> think >> > > > the FLIP is already in a very good shape. >> > > > >> > > > I like the approach that the BundledAggregateFunction is rather a >> > > > separate interface that can be implemented by advanced users. This >> > > > matches with the existing SpecializedFunction interface and the >> > upcoming >> > > > ChangelogFunction of FLIP-440. >> > > > >> > > > However, I have some additional feedback: >> > > > >> > > > Correct me if I'm wrong, but for AggregateFunctions implementing a >> > > > retract() and merge() is optional. How can a >> BundledAggregateFunction >> > > > communicate whether or not this is supported to the planner? >> Enforcing >> > > > the retract() feature in the interface specification could be an >> > option, >> > > > but esp for window aggregations there might not be a retract >> required. >> > > > >> > > > Also how do you plan to support merge() in this design? I couldn't >> find >> > > > any mentioning in the FLIP. >> > > > >> > > > Regards, >> > > > Timo >> > > > >> > > > >> > > > >> > > > On 12.12.24 02:57, Alan Sheinberg wrote: >> > > > > I'd like to start a discussion of FLIP-491: >> BundledAggregateFunction >> > > for >> > > > > batched aggregation [1] >> > > > > >> > > > > This feature proposes adding a new interface >> BundledAggregateFunction >> > > > that >> > > > > can be implemented by AggregateFunction UDFs. This allows the use >> > of a >> > > > > batched method call so that users can handle many rows at a time >> for >> > > > > multiple keys rather than the per-row calls such as accumulate and >> > > > retract. >> > > > > >> > > > > The purpose is to achieve high throughput while still allowing for >> > > calls >> > > > to >> > > > > external systems or other blocking operations. Similar calls >> through >> > > the >> > > > > conventional AggregateFunction methods would be prohibitively >> slow, >> > but >> > > > if >> > > > > given a batch of inputs and accumulators for each key, the >> > implementer >> > > > has >> > > > > the power to parallelize or internally batch lookups to improve >> > > > performance. >> > > > > >> > > > > Looking forward to your feedback and suggestions. >> > > > > >> > > > > [1] >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-491%3A+BundledAggregateFunction+for+batched+aggregation >> > > > > >> > > > > >> > > > > Thanks, >> > > > > Alan >> > > > > >> > > > >> > > > >> > > >> > >> >