Hi Alex, On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <alexandre.bra...@gmail.com> wrote:
> > My point here is that, when we're only interested in a max/min numeric > value, it doesn't > matter when we have repeated values, since we'd be only forwarding the > number downstream, > so I could disregard when the Comparator returns a zero value (meaning > equals) and min/max > would still be semantically correct. But when we're forwarding the original > object downstream > instead of its numeric property, it could mean different things > semantically depending on how > we handle the repeated values. > > As an example, if I were using max() on a stream of Biddings for products > in an auction, the > order of the biddings would probably influence the winner if two clients > send Biddings with the > same value. If we're only forwarding the Bidding value downstream (a double > value of 100, for > example), it doesn't matter how repeated values are handled, since the max > price for this > auction would still be 100.00, no matter what Bidding got selected in the > end. But if we're > forwarding the Biddings downstream instead, then it matters whether the > winning Bidding sent > downstream was originally posted by Client A or Client B. > > I'm not saying that an overloaded method to handle different options for > how repeated values > should be handled by min/max is mandatory, but it should be clear on the > methods' docs > what would happen when Comparator.compare() == 0. My preferred option for > the default > behaviour is to only forward a new value is smaller/bigger than the > previous min/max value > (ignoring compare() == 0), since it would emit less values downstream and > would be easier > to read ("I only send a value downstream if it's bigger/smaller than the > previously selected > value"). > Thanks for the clarification. I like your suggestion unless someone feels that they want an option to control this (i.e., when compare() == 0, return the old value vs new value). > > > Not knowing the schema of the value (V) has its own set of problems. As I > have alluded to > > in the proposal, this is a little bit messy. We already have "reduce" > which can be used to > > implement sum (mapValues().reduce()). > > Thinking about it more, do you think "sum" would be useful ? One hacky > way to implement > > this is to inspect the type of the return when the "func" is called the > first time OR infer from > > the materialized or have an explicit initializer. > > I think it might be useful for some use cases, yes, but it would be tricky > to implement this in a > way that handles generic Numbers and keeps their original implementation > class. One > simplification you could take is fixating VR to be a Double, and then use > Number.doubleValue() > to compute the sum. > Yeah, that would simplify quite a bit. I think you are suggesting this: KTable<K,Double> sum(Function<V, Number> func) > What you said about using reduce() to compute a sum() is also true for > min() and max(). =) All > three methods in this KIP would be a syntactic sugar for what could > otherwise be implemented > using reduce/aggregate, but I see value in implementing them and > simplifying the adoption of > those use cases. > > Agreed. I seem to have forgotten the reason as to why I started this KIP :-). There is a long way to go. -thanks Mohan Best regards, > Alexandre > > On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <mposde...@gmail.com> > wrote: > > > Hi Alex, > > > > Responses below. > > > > On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > > alexandre.bra...@gmail.com> > > wrote: > > > > > Hi Mohan, > > > > > > I like the idea of adding those methods to the API, but I'd like to > make > > a > > > suggestion: > > > > > > Although the most used scenario for min() / max() might possibly be for > > > numeric values, I think they could also be > > > useful on other objects like Dates, LocalDates or Strings. Why limit > the > > > API to Numbers only? > > > > > > > > There was no specific reason. Just addressing the common scenario. But I > > don't see why this can't be supported given your suggestion below. > > > > Extending on the above, couldn't we change the API to provide a > > > Comparator<V> instead of the Function<V, VR> > > > for those methods, and make them return a KTable<K, V> instead? Not > only > > > would this approach not limit the > > > usage of those methods to Numbers, but they'd also preserve the origin > > from > > > the min/max value [1]. The extraction of > > > a single (numeric?) value could be achieved by a subsequent > .mapValues() > > > operator, and this strategy would also > > > allow us to reuse the stream's current value serde on min / max, making > > the > > > Materialized an optional parameter. > > > > > > I like your idea though it is odd that min/max returns KTable<K, V> > > instead of the KTable<K, VR> (like in count), but mapValues should do the > > trick. > > > > One extra complication of this approach is that now we'd have to handle > > > repeated min/max values from different > > > origins (two semantically different objects for which the comparator > > > returns 0), but we could solve that by adding > > > a parameter to specify whether to use the older or newer value (or > > assuming > > > one of these options as default for a > > > simpler API?). > > > > > > I am not sure whether this complexity is warranted. Why can't we just > > stick to the way a regular Comparator works ? Can you give me a real > world > > example ? > > > > > > > > I know it's an implementation issue, but I'm curious on how you'd solve > > > handling the <VR extends Number> on > > > the sum(). Since the multiple implementations of this interface don't > > have > > > a common constructor nor an interface > > > method to add two Numbers, would it be possible to implement sum() and > > > retain the original VR type on the > > > returned KTable? > > > > > > > Not knowing the schema of the value (V) has its own set of problems. As I > > have alluded to in the proposal, this is a little bit messy. We already > > have "reduce" which can be used to implement sum (mapValues().reduce()). > > Thinking about it more, do you think "sum" would be useful ? One hacky > way > > to implement this is to inspect the type of the return when the "func" is > > called the first time OR infer from the materialized or have an explicit > > initializer. > > > > Thanks > > Mohan > > > > > > > [1]: An example scenario for this would be to find the min / max > Bidding > > > for a product where, at the end of the > > > auction, I need not only the min / max value of said Bidding, but also > > the > > > bidder's contact information. > > > > > > Best, > > > Alexandre > > > > > > On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < > mposde...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I have created a proposal for adding some additional aggregation APIs > > > like > > > > count. > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > > > > > > > > I have noted down some of the issues that need discussion. Thanks to > > > > Matthias for helping me with the scope of the proposal. > > > > > > > > Thanks > > > > Mohan > > > > > > > > > >