Mohan / Mathias, > > I think extending min/max to non-numeric types makes sense. Wondering > > why we should require a `Comparator` or if we should require that the > > types implement `Comparable` instead? > > > Good question. This is what it would look like: > > KTable<K, V> min_comparable() > KTable<K, V> min_comparator(Comparator<V> comp)
Not sure if I understood Mathias' proposal correctly, but I think that instead of going with your original proposal (<VR extends Number> KTable<K, VR> min(Function<V, VR> func...) or mine (KTable<K, V> min(Comparator<V> comparator...), we could simplify it a bit by using a function to extract a Comparable property from the original value: KTable<K, V> min(Function<V, Comparable<?>> func...) > I also think, that min/max should not change the value type. Using > `Long` for sum() make sense though, and also to require a `<V extends > Number>`. Are there any reasons to limit the sum() to integers? Why not use a Double instead? Best regards, Alexandre On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <mposde...@gmail.com> wrote: > Matthias, > > On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax <mj...@mailbox.org.invalid > > > wrote: > > > Hi, > > > > I think extending min/max to non-numeric types makes sense. Wondering > > why we should require a `Comparator` or if we should require that the > > types implement `Comparable` instead? > > > > Good question. This is what it would look like: > > KTable<K, V> min_comparable() > KTable<K, V> min_comparator(Comparator<V> comp) > > For min_comparable to work, you still need the bounds "V extends > Comparable< > V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, it > has to be at the interface level like this: > > KStream<K, V extends Comparable<V>> > > which is a little messy unless there is a different way to do the same. The > comparator gives a simple way to define an anonymous function. > > What do you think ? > > > > I also think, that min/max should not change the value type. Using > > `Long` for sum() make sense though, and also to require a `<V extends > > Number>`. > > > > I guess these are the two possibilities: > > <E extends Number> Long sum(Function<V, E> func) > Long sum(Function<V, Number> func) > > Both should work. "func" can return any subtypes of Number and I don't see > any advantages with the first version. Can you clarify ? > > Thanks > Mohan > > > > > > -Matthias > > > > On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: > > > Hi Alex, > > > > > > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < > > alexandre.bra...@gmail.com> > > > wrote: > > > > > >> > > >> My point here is that, when we're only interested in a max/min numeric > > >> value, it doesn't > > >> matter when we have repeated values, since we'd be only forwarding the > > >> number downstream, > > >> so I could disregard when the Comparator returns a zero value (meaning > > >> equals) and min/max > > >> would still be semantically correct. But when we're forwarding the > > original > > >> object downstream > > >> instead of its numeric property, it could mean different things > > >> semantically depending on how > > >> we handle the repeated values. > > >> > > >> As an example, if I were using max() on a stream of Biddings for > > products > > >> in an auction, the > > >> order of the biddings would probably influence the winner if two > clients > > >> send Biddings with the > > >> same value. If we're only forwarding the Bidding value downstream (a > > double > > >> value of 100, for > > >> example), it doesn't matter how repeated values are handled, since the > > max > > >> price for this > > >> auction would still be 100.00, no matter what Bidding got selected in > > the > > >> end. But if we're > > >> forwarding the Biddings downstream instead, then it matters whether > the > > >> winning Bidding sent > > >> downstream was originally posted by Client A or Client B. > > >> > > >> I'm not saying that an overloaded method to handle different options > for > > >> how repeated values > > >> should be handled by min/max is mandatory, but it should be clear on > the > > >> methods' docs > > >> what would happen when Comparator.compare() == 0. My preferred option > > for > > >> the default > > >> behaviour is to only forward a new value is smaller/bigger than the > > >> previous min/max value > > >> (ignoring compare() == 0), since it would emit less values downstream > > and > > >> would be easier > > >> to read ("I only send a value downstream if it's bigger/smaller than > the > > >> previously selected > > >> value"). > > >> > > > Thanks for the clarification. I like your suggestion unless someone > feels > > > that they want an option to control this (i.e., when compare() == 0, > > return > > > the old value vs new value). > > > > > > > > >> > > >>> Not knowing the schema of the value (V) has its own set of problems. > > As I > > >> have alluded to > > >>> in the proposal, this is a little bit messy. We already have "reduce" > > >> which can be used to > > >>> implement sum (mapValues().reduce()). > > >>> Thinking about it more, do you think "sum" would be useful ? One > hacky > > >> way to implement > > >>> this is to inspect the type of the return when the "func" is called > the > > >> first time OR infer from > > >>> the materialized or have an explicit initializer. > > >> > > >> I think it might be useful for some use cases, yes, but it would be > > tricky > > >> to implement this in a > > >> way that handles generic Numbers and keeps their original > implementation > > >> class. One > > >> simplification you could take is fixating VR to be a Double, and then > > use > > >> Number.doubleValue() > > >> to compute the sum. > > >> > > > > > > Yeah, that would simplify quite a bit. I think you are suggesting this: > > > > > > KTable<K,Double> sum(Function<V, Number> func) > > > > > > > > >> What you said about using reduce() to compute a sum() is also true for > > >> min() and max(). =) All > > >> three methods in this KIP would be a syntactic sugar for what could > > >> otherwise be implemented > > >> using reduce/aggregate, but I see value in implementing them and > > >> simplifying the adoption of > > >> those use cases. > > >> > > >> Agreed. I seem to have forgotten the reason as to why I started this > KIP > > > :-). There is a long way to go. > > > > > > -thanks > > > Mohan > > > > > > Best regards, > > >> Alexandre > > >> > > >> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < > > mposde...@gmail.com> > > >> wrote: > > >> > > >>> Hi Alex, > > >>> > > >>> Responses below. > > >>> > > >>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > > >>> alexandre.bra...@gmail.com> > > >>> wrote: > > >>> > > >>>> Hi Mohan, > > >>>> > > >>>> I like the idea of adding those methods to the API, but I'd like to > > >> make > > >>> a > > >>>> suggestion: > > >>>> > > >>>> Although the most used scenario for min() / max() might possibly be > > for > > >>>> numeric values, I think they could also be > > >>>> useful on other objects like Dates, LocalDates or Strings. Why limit > > >> the > > >>>> API to Numbers only? > > >>>> > > >>>> > > >>> There was no specific reason. Just addressing the common scenario. > But > > I > > >>> don't see why this can't be supported given your suggestion below. > > >>> > > >>> Extending on the above, couldn't we change the API to provide a > > >>>> Comparator<V> instead of the Function<V, VR> > > >>>> for those methods, and make them return a KTable<K, V> instead? Not > > >> only > > >>>> would this approach not limit the > > >>>> usage of those methods to Numbers, but they'd also preserve the > origin > > >>> from > > >>>> the min/max value [1]. The extraction of > > >>>> a single (numeric?) value could be achieved by a subsequent > > >> .mapValues() > > >>>> operator, and this strategy would also > > >>>> allow us to reuse the stream's current value serde on min / max, > > making > > >>> the > > >>>> Materialized an optional parameter. > > >>>> > > >>>> I like your idea though it is odd that min/max returns KTable<K, V> > > >>> instead of the KTable<K, VR> (like in count), but mapValues should do > > the > > >>> trick. > > >>> > > >>> One extra complication of this approach is that now we'd have to > handle > > >>>> repeated min/max values from different > > >>>> origins (two semantically different objects for which the comparator > > >>>> returns 0), but we could solve that by adding > > >>>> a parameter to specify whether to use the older or newer value (or > > >>> assuming > > >>>> one of these options as default for a > > >>>> simpler API?). > > >>>> > > >>>> I am not sure whether this complexity is warranted. Why can't we > just > > >>> stick to the way a regular Comparator works ? Can you give me a real > > >> world > > >>> example ? > > >>> > > >>>> > > >>>> I know it's an implementation issue, but I'm curious on how you'd > > solve > > >>>> handling the <VR extends Number> on > > >>>> the sum(). Since the multiple implementations of this interface > don't > > >>> have > > >>>> a common constructor nor an interface > > >>>> method to add two Numbers, would it be possible to implement sum() > and > > >>>> retain the original VR type on the > > >>>> returned KTable? > > >>>> > > >>> > > >>> Not knowing the schema of the value (V) has its own set of problems. > > As I > > >>> have alluded to in the proposal, this is a little bit messy. We > already > > >>> have "reduce" which can be used to implement sum > > (mapValues().reduce()). > > >>> Thinking about it more, do you think "sum" would be useful ? One > hacky > > >> way > > >>> to implement this is to inspect the type of the return when the > "func" > > is > > >>> called the first time OR infer from the materialized or have an > > explicit > > >>> initializer. > > >>> > > >>> Thanks > > >>> Mohan > > >>> > > >>> > > >>>> [1]: An example scenario for this would be to find the min / max > > >> Bidding > > >>>> for a product where, at the end of the > > >>>> auction, I need not only the min / max value of said Bidding, but > also > > >>> the > > >>>> bidder's contact information. > > >>>> > > >>>> Best, > > >>>> Alexandre > > >>>> > > >>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < > > >> mposde...@gmail.com> > > >>>> wrote: > > >>>> > > >>>>> Hi, > > >>>>> > > >>>>> I have created a proposal for adding some additional aggregation > APIs > > >>>> like > > >>>>> count. > > >>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > > >>>>> > > >>>>> I have noted down some of the issues that need discussion. Thanks > to > > >>>>> Matthias for helping me with the scope of the proposal. > > >>>>> > > >>>>> Thanks > > >>>>> Mohan > > >>>>> > > >>>> > > >>> > > >> > > > > > >