Hi Mohan, > I like your idea though it is odd that min/max returns KTable<K, V> instead of the > KTable<K, VR> (like in count), but mapValues should do the trick.
My line of thought is that min/max could forward the min/max V associated with each K downstream. It'd not be the same as in count, where it'll always forward a positive integer representing the number of occurrences. > I am not sure whether this complexity is warranted. Why can't we just stick to the way > a regular Comparator works ? Can you give me a real world example ? My point here is that, when we're only interested in a max/min numeric value, it doesn't matter when we have repeated values, since we'd be only forwarding the number downstream, so I could disregard when the Comparator returns a zero value (meaning equals) and min/max would still be semantically correct. But when we're forwarding the original object downstream instead of its numeric property, it could mean different things semantically depending on how we handle the repeated values. As an example, if I were using max() on a stream of Biddings for products in an auction, the order of the biddings would probably influence the winner if two clients send Biddings with the same value. If we're only forwarding the Bidding value downstream (a double value of 100, for example), it doesn't matter how repeated values are handled, since the max price for this auction would still be 100.00, no matter what Bidding got selected in the end. But if we're forwarding the Biddings downstream instead, then it matters whether the winning Bidding sent downstream was originally posted by Client A or Client B. I'm not saying that an overloaded method to handle different options for how repeated values should be handled by min/max is mandatory, but it should be clear on the methods' docs what would happen when Comparator.compare() == 0. My preferred option for the default behaviour is to only forward a new value is smaller/bigger than the previous min/max value (ignoring compare() == 0), since it would emit less values downstream and would be easier to read ("I only send a value downstream if it's bigger/smaller than the previously selected value"). > Not knowing the schema of the value (V) has its own set of problems. As I have alluded to > in the proposal, this is a little bit messy. We already have "reduce" which can be used to > implement sum (mapValues().reduce()). > Thinking about it more, do you think "sum" would be useful ? One hacky way to implement > this is to inspect the type of the return when the "func" is called the first time OR infer from > the materialized or have an explicit initializer. I think it might be useful for some use cases, yes, but it would be tricky to implement this in a way that handles generic Numbers and keeps their original implementation class. One simplification you could take is fixating VR to be a Double, and then use Number.doubleValue() to compute the sum. What you said about using reduce() to compute a sum() is also true for min() and max(). =) All three methods in this KIP would be a syntactic sugar for what could otherwise be implemented using reduce/aggregate, but I see value in implementing them and simplifying the adoption of those use cases. Best regards, Alexandre On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <mposde...@gmail.com> wrote: > Hi Alex, > > Responses below. > > On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > alexandre.bra...@gmail.com> > wrote: > > > Hi Mohan, > > > > I like the idea of adding those methods to the API, but I'd like to make > a > > suggestion: > > > > Although the most used scenario for min() / max() might possibly be for > > numeric values, I think they could also be > > useful on other objects like Dates, LocalDates or Strings. Why limit the > > API to Numbers only? > > > > > There was no specific reason. Just addressing the common scenario. But I > don't see why this can't be supported given your suggestion below. > > Extending on the above, couldn't we change the API to provide a > > Comparator<V> instead of the Function<V, VR> > > for those methods, and make them return a KTable<K, V> instead? Not only > > would this approach not limit the > > usage of those methods to Numbers, but they'd also preserve the origin > from > > the min/max value [1]. The extraction of > > a single (numeric?) value could be achieved by a subsequent .mapValues() > > operator, and this strategy would also > > allow us to reuse the stream's current value serde on min / max, making > the > > Materialized an optional parameter. > > > > I like your idea though it is odd that min/max returns KTable<K, V> > instead of the KTable<K, VR> (like in count), but mapValues should do the > trick. > > One extra complication of this approach is that now we'd have to handle > > repeated min/max values from different > > origins (two semantically different objects for which the comparator > > returns 0), but we could solve that by adding > > a parameter to specify whether to use the older or newer value (or > assuming > > one of these options as default for a > > simpler API?). > > > > I am not sure whether this complexity is warranted. Why can't we just > stick to the way a regular Comparator works ? Can you give me a real world > example ? > > > > > I know it's an implementation issue, but I'm curious on how you'd solve > > handling the <VR extends Number> on > > the sum(). Since the multiple implementations of this interface don't > have > > a common constructor nor an interface > > method to add two Numbers, would it be possible to implement sum() and > > retain the original VR type on the > > returned KTable? > > > > Not knowing the schema of the value (V) has its own set of problems. As I > have alluded to in the proposal, this is a little bit messy. We already > have "reduce" which can be used to implement sum (mapValues().reduce()). > Thinking about it more, do you think "sum" would be useful ? One hacky way > to implement this is to inspect the type of the return when the "func" is > called the first time OR infer from the materialized or have an explicit > initializer. > > Thanks > Mohan > > > > [1]: An example scenario for this would be to find the min / max Bidding > > for a product where, at the end of the > > auction, I need not only the min / max value of said Bidding, but also > the > > bidder's contact information. > > > > Best, > > Alexandre > > > > On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <mposde...@gmail.com> > > wrote: > > > > > Hi, > > > > > > I have created a proposal for adding some additional aggregation APIs > > like > > > count. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > > > > > > I have noted down some of the issues that need discussion. Thanks to > > > Matthias for helping me with the scope of the proposal. > > > > > > Thanks > > > Mohan > > > > > >