Matthias, On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax <mj...@mailbox.org.invalid> wrote:
> Hi, > > I think extending min/max to non-numeric types makes sense. Wondering > why we should require a `Comparator` or if we should require that the > types implement `Comparable` instead? > > Good question. This is what it would look like: KTable<K, V> min_comparable() KTable<K, V> min_comparator(Comparator<V> comp) For min_comparable to work, you still need the bounds "V extends Comparable< V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, it has to be at the interface level like this: KStream<K, V extends Comparable<V>> which is a little messy unless there is a different way to do the same. The comparator gives a simple way to define an anonymous function. What do you think ? > I also think, that min/max should not change the value type. Using > `Long` for sum() make sense though, and also to require a `<V extends > Number>`. > > I guess these are the two possibilities: <E extends Number> Long sum(Function<V, E> func) Long sum(Function<V, Number> func) Both should work. "func" can return any subtypes of Number and I don't see any advantages with the first version. Can you clarify ? Thanks Mohan > > -Matthias > > On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: > > Hi Alex, > > > > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < > alexandre.bra...@gmail.com> > > wrote: > > > >> > >> My point here is that, when we're only interested in a max/min numeric > >> value, it doesn't > >> matter when we have repeated values, since we'd be only forwarding the > >> number downstream, > >> so I could disregard when the Comparator returns a zero value (meaning > >> equals) and min/max > >> would still be semantically correct. But when we're forwarding the > original > >> object downstream > >> instead of its numeric property, it could mean different things > >> semantically depending on how > >> we handle the repeated values. > >> > >> As an example, if I were using max() on a stream of Biddings for > products > >> in an auction, the > >> order of the biddings would probably influence the winner if two clients > >> send Biddings with the > >> same value. If we're only forwarding the Bidding value downstream (a > double > >> value of 100, for > >> example), it doesn't matter how repeated values are handled, since the > max > >> price for this > >> auction would still be 100.00, no matter what Bidding got selected in > the > >> end. But if we're > >> forwarding the Biddings downstream instead, then it matters whether the > >> winning Bidding sent > >> downstream was originally posted by Client A or Client B. > >> > >> I'm not saying that an overloaded method to handle different options for > >> how repeated values > >> should be handled by min/max is mandatory, but it should be clear on the > >> methods' docs > >> what would happen when Comparator.compare() == 0. My preferred option > for > >> the default > >> behaviour is to only forward a new value is smaller/bigger than the > >> previous min/max value > >> (ignoring compare() == 0), since it would emit less values downstream > and > >> would be easier > >> to read ("I only send a value downstream if it's bigger/smaller than the > >> previously selected > >> value"). > >> > > Thanks for the clarification. I like your suggestion unless someone feels > > that they want an option to control this (i.e., when compare() == 0, > return > > the old value vs new value). > > > > > >> > >>> Not knowing the schema of the value (V) has its own set of problems. > As I > >> have alluded to > >>> in the proposal, this is a little bit messy. We already have "reduce" > >> which can be used to > >>> implement sum (mapValues().reduce()). > >>> Thinking about it more, do you think "sum" would be useful ? One hacky > >> way to implement > >>> this is to inspect the type of the return when the "func" is called the > >> first time OR infer from > >>> the materialized or have an explicit initializer. > >> > >> I think it might be useful for some use cases, yes, but it would be > tricky > >> to implement this in a > >> way that handles generic Numbers and keeps their original implementation > >> class. One > >> simplification you could take is fixating VR to be a Double, and then > use > >> Number.doubleValue() > >> to compute the sum. > >> > > > > Yeah, that would simplify quite a bit. I think you are suggesting this: > > > > KTable<K,Double> sum(Function<V, Number> func) > > > > > >> What you said about using reduce() to compute a sum() is also true for > >> min() and max(). =) All > >> three methods in this KIP would be a syntactic sugar for what could > >> otherwise be implemented > >> using reduce/aggregate, but I see value in implementing them and > >> simplifying the adoption of > >> those use cases. > >> > >> Agreed. I seem to have forgotten the reason as to why I started this KIP > > :-). There is a long way to go. > > > > -thanks > > Mohan > > > > Best regards, > >> Alexandre > >> > >> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < > mposde...@gmail.com> > >> wrote: > >> > >>> Hi Alex, > >>> > >>> Responses below. > >>> > >>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > >>> alexandre.bra...@gmail.com> > >>> wrote: > >>> > >>>> Hi Mohan, > >>>> > >>>> I like the idea of adding those methods to the API, but I'd like to > >> make > >>> a > >>>> suggestion: > >>>> > >>>> Although the most used scenario for min() / max() might possibly be > for > >>>> numeric values, I think they could also be > >>>> useful on other objects like Dates, LocalDates or Strings. Why limit > >> the > >>>> API to Numbers only? > >>>> > >>>> > >>> There was no specific reason. Just addressing the common scenario. But > I > >>> don't see why this can't be supported given your suggestion below. > >>> > >>> Extending on the above, couldn't we change the API to provide a > >>>> Comparator<V> instead of the Function<V, VR> > >>>> for those methods, and make them return a KTable<K, V> instead? Not > >> only > >>>> would this approach not limit the > >>>> usage of those methods to Numbers, but they'd also preserve the origin > >>> from > >>>> the min/max value [1]. The extraction of > >>>> a single (numeric?) value could be achieved by a subsequent > >> .mapValues() > >>>> operator, and this strategy would also > >>>> allow us to reuse the stream's current value serde on min / max, > making > >>> the > >>>> Materialized an optional parameter. > >>>> > >>>> I like your idea though it is odd that min/max returns KTable<K, V> > >>> instead of the KTable<K, VR> (like in count), but mapValues should do > the > >>> trick. > >>> > >>> One extra complication of this approach is that now we'd have to handle > >>>> repeated min/max values from different > >>>> origins (two semantically different objects for which the comparator > >>>> returns 0), but we could solve that by adding > >>>> a parameter to specify whether to use the older or newer value (or > >>> assuming > >>>> one of these options as default for a > >>>> simpler API?). > >>>> > >>>> I am not sure whether this complexity is warranted. Why can't we just > >>> stick to the way a regular Comparator works ? Can you give me a real > >> world > >>> example ? > >>> > >>>> > >>>> I know it's an implementation issue, but I'm curious on how you'd > solve > >>>> handling the <VR extends Number> on > >>>> the sum(). Since the multiple implementations of this interface don't > >>> have > >>>> a common constructor nor an interface > >>>> method to add two Numbers, would it be possible to implement sum() and > >>>> retain the original VR type on the > >>>> returned KTable? > >>>> > >>> > >>> Not knowing the schema of the value (V) has its own set of problems. > As I > >>> have alluded to in the proposal, this is a little bit messy. We already > >>> have "reduce" which can be used to implement sum > (mapValues().reduce()). > >>> Thinking about it more, do you think "sum" would be useful ? One hacky > >> way > >>> to implement this is to inspect the type of the return when the "func" > is > >>> called the first time OR infer from the materialized or have an > explicit > >>> initializer. > >>> > >>> Thanks > >>> Mohan > >>> > >>> > >>>> [1]: An example scenario for this would be to find the min / max > >> Bidding > >>>> for a product where, at the end of the > >>>> auction, I need not only the min / max value of said Bidding, but also > >>> the > >>>> bidder's contact information. > >>>> > >>>> Best, > >>>> Alexandre > >>>> > >>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < > >> mposde...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> I have created a proposal for adding some additional aggregation APIs > >>>> like > >>>>> count. > >>>>> > >>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > >>>>> > >>>>> I have noted down some of the issues that need discussion. Thanks to > >>>>> Matthias for helping me with the scope of the proposal. > >>>>> > >>>>> Thanks > >>>>> Mohan > >>>>> > >>>> > >>> > >> > > >