Alex,
On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <alexandre.bra...@gmail.com> wrote: > Mohan / Mathias, > > > > I think extending min/max to non-numeric types makes sense. Wondering > > > why we should require a `Comparator` or if we should require that the > > > types implement `Comparable` instead? > > > > > Good question. This is what it would look like: > > > > KTable<K, V> min_comparable() > > KTable<K, V> min_comparator(Comparator<V> comp) > > Not sure if I understood Mathias' proposal correctly, but I think that > instead of going with > your original proposal (<VR extends Number> KTable<K, VR> min(Function<V, > VR> func...) > or mine (KTable<K, V> min(Comparator<V> comparator...), we could simplify > it a > bit by using a function to extract a Comparable property from the original > value: > > KTable<K, V> min(Function<V, Comparable<?>> func...) > > I will let Matthias clarify. I am not sure why it is simpler than the comparator one. Comparable is implemented by the type and not sure exposing it this way makes it any better. > I also think, that min/max should not change the value type. Using > > `Long` for sum() make sense though, and also to require a `<V extends > > Number>`. > > Are there any reasons to limit the sum() to integers? Why not use a Double > instead? > > Yeah, if the precision is important, then we should stick with Double. -mohan Best regards, > Alexandre > > On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <mposde...@gmail.com> > wrote: > > > Matthias, > > > > On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax > <mj...@mailbox.org.invalid > > > > > wrote: > > > > > Hi, > > > > > > I think extending min/max to non-numeric types makes sense. Wondering > > > why we should require a `Comparator` or if we should require that the > > > types implement `Comparable` instead? > > > > > > Good question. This is what it would look like: > > > > KTable<K, V> min_comparable() > > KTable<K, V> min_comparator(Comparator<V> comp) > > > > For min_comparable to work, you still need the bounds "V extends > > Comparable< > > V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, > it > > has to be at the interface level like this: > > > > KStream<K, V extends Comparable<V>> > > > > which is a little messy unless there is a different way to do the same. > The > > comparator gives a simple way to define an anonymous function. > > > > What do you think ? > > > > > > > I also think, that min/max should not change the value type. Using > > > `Long` for sum() make sense though, and also to require a `<V extends > > > Number>`. > > > > > > I guess these are the two possibilities: > > > > <E extends Number> Long sum(Function<V, E> func) > > Long sum(Function<V, Number> func) > > > > Both should work. "func" can return any subtypes of Number and I don't > see > > any advantages with the first version. Can you clarify ? > > > > Thanks > > Mohan > > > > > > > > > > -Matthias > > > > > > On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: > > > > Hi Alex, > > > > > > > > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < > > > alexandre.bra...@gmail.com> > > > > wrote: > > > > > > > >> > > > >> My point here is that, when we're only interested in a max/min > numeric > > > >> value, it doesn't > > > >> matter when we have repeated values, since we'd be only forwarding > the > > > >> number downstream, > > > >> so I could disregard when the Comparator returns a zero value > (meaning > > > >> equals) and min/max > > > >> would still be semantically correct. But when we're forwarding the > > > original > > > >> object downstream > > > >> instead of its numeric property, it could mean different things > > > >> semantically depending on how > > > >> we handle the repeated values. > > > >> > > > >> As an example, if I were using max() on a stream of Biddings for > > > products > > > >> in an auction, the > > > >> order of the biddings would probably influence the winner if two > > clients > > > >> send Biddings with the > > > >> same value. If we're only forwarding the Bidding value downstream (a > > > double > > > >> value of 100, for > > > >> example), it doesn't matter how repeated values are handled, since > the > > > max > > > >> price for this > > > >> auction would still be 100.00, no matter what Bidding got selected > in > > > the > > > >> end. But if we're > > > >> forwarding the Biddings downstream instead, then it matters whether > > the > > > >> winning Bidding sent > > > >> downstream was originally posted by Client A or Client B. > > > >> > > > >> I'm not saying that an overloaded method to handle different options > > for > > > >> how repeated values > > > >> should be handled by min/max is mandatory, but it should be clear on > > the > > > >> methods' docs > > > >> what would happen when Comparator.compare() == 0. My preferred > option > > > for > > > >> the default > > > >> behaviour is to only forward a new value is smaller/bigger than the > > > >> previous min/max value > > > >> (ignoring compare() == 0), since it would emit less values > downstream > > > and > > > >> would be easier > > > >> to read ("I only send a value downstream if it's bigger/smaller than > > the > > > >> previously selected > > > >> value"). > > > >> > > > > Thanks for the clarification. I like your suggestion unless someone > > feels > > > > that they want an option to control this (i.e., when compare() == 0, > > > return > > > > the old value vs new value). > > > > > > > > > > > >> > > > >>> Not knowing the schema of the value (V) has its own set of > problems. > > > As I > > > >> have alluded to > > > >>> in the proposal, this is a little bit messy. We already have > "reduce" > > > >> which can be used to > > > >>> implement sum (mapValues().reduce()). > > > >>> Thinking about it more, do you think "sum" would be useful ? One > > hacky > > > >> way to implement > > > >>> this is to inspect the type of the return when the "func" is called > > the > > > >> first time OR infer from > > > >>> the materialized or have an explicit initializer. > > > >> > > > >> I think it might be useful for some use cases, yes, but it would be > > > tricky > > > >> to implement this in a > > > >> way that handles generic Numbers and keeps their original > > implementation > > > >> class. One > > > >> simplification you could take is fixating VR to be a Double, and > then > > > use > > > >> Number.doubleValue() > > > >> to compute the sum. > > > >> > > > > > > > > Yeah, that would simplify quite a bit. I think you are suggesting > this: > > > > > > > > KTable<K,Double> sum(Function<V, Number> func) > > > > > > > > > > > >> What you said about using reduce() to compute a sum() is also true > for > > > >> min() and max(). =) All > > > >> three methods in this KIP would be a syntactic sugar for what could > > > >> otherwise be implemented > > > >> using reduce/aggregate, but I see value in implementing them and > > > >> simplifying the adoption of > > > >> those use cases. > > > >> > > > >> Agreed. I seem to have forgotten the reason as to why I started this > > KIP > > > > :-). There is a long way to go. > > > > > > > > -thanks > > > > Mohan > > > > > > > > Best regards, > > > >> Alexandre > > > >> > > > >> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < > > > mposde...@gmail.com> > > > >> wrote: > > > >> > > > >>> Hi Alex, > > > >>> > > > >>> Responses below. > > > >>> > > > >>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > > > >>> alexandre.bra...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Hi Mohan, > > > >>>> > > > >>>> I like the idea of adding those methods to the API, but I'd like > to > > > >> make > > > >>> a > > > >>>> suggestion: > > > >>>> > > > >>>> Although the most used scenario for min() / max() might possibly > be > > > for > > > >>>> numeric values, I think they could also be > > > >>>> useful on other objects like Dates, LocalDates or Strings. Why > limit > > > >> the > > > >>>> API to Numbers only? > > > >>>> > > > >>>> > > > >>> There was no specific reason. Just addressing the common scenario. > > But > > > I > > > >>> don't see why this can't be supported given your suggestion below. > > > >>> > > > >>> Extending on the above, couldn't we change the API to provide a > > > >>>> Comparator<V> instead of the Function<V, VR> > > > >>>> for those methods, and make them return a KTable<K, V> instead? > Not > > > >> only > > > >>>> would this approach not limit the > > > >>>> usage of those methods to Numbers, but they'd also preserve the > > origin > > > >>> from > > > >>>> the min/max value [1]. The extraction of > > > >>>> a single (numeric?) value could be achieved by a subsequent > > > >> .mapValues() > > > >>>> operator, and this strategy would also > > > >>>> allow us to reuse the stream's current value serde on min / max, > > > making > > > >>> the > > > >>>> Materialized an optional parameter. > > > >>>> > > > >>>> I like your idea though it is odd that min/max returns KTable<K, > V> > > > >>> instead of the KTable<K, VR> (like in count), but mapValues should > do > > > the > > > >>> trick. > > > >>> > > > >>> One extra complication of this approach is that now we'd have to > > handle > > > >>>> repeated min/max values from different > > > >>>> origins (two semantically different objects for which the > comparator > > > >>>> returns 0), but we could solve that by adding > > > >>>> a parameter to specify whether to use the older or newer value (or > > > >>> assuming > > > >>>> one of these options as default for a > > > >>>> simpler API?). > > > >>>> > > > >>>> I am not sure whether this complexity is warranted. Why can't we > > just > > > >>> stick to the way a regular Comparator works ? Can you give me a > real > > > >> world > > > >>> example ? > > > >>> > > > >>>> > > > >>>> I know it's an implementation issue, but I'm curious on how you'd > > > solve > > > >>>> handling the <VR extends Number> on > > > >>>> the sum(). Since the multiple implementations of this interface > > don't > > > >>> have > > > >>>> a common constructor nor an interface > > > >>>> method to add two Numbers, would it be possible to implement sum() > > and > > > >>>> retain the original VR type on the > > > >>>> returned KTable? > > > >>>> > > > >>> > > > >>> Not knowing the schema of the value (V) has its own set of > problems. > > > As I > > > >>> have alluded to in the proposal, this is a little bit messy. We > > already > > > >>> have "reduce" which can be used to implement sum > > > (mapValues().reduce()). > > > >>> Thinking about it more, do you think "sum" would be useful ? One > > hacky > > > >> way > > > >>> to implement this is to inspect the type of the return when the > > "func" > > > is > > > >>> called the first time OR infer from the materialized or have an > > > explicit > > > >>> initializer. > > > >>> > > > >>> Thanks > > > >>> Mohan > > > >>> > > > >>> > > > >>>> [1]: An example scenario for this would be to find the min / max > > > >> Bidding > > > >>>> for a product where, at the end of the > > > >>>> auction, I need not only the min / max value of said Bidding, but > > also > > > >>> the > > > >>>> bidder's contact information. > > > >>>> > > > >>>> Best, > > > >>>> Alexandre > > > >>>> > > > >>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < > > > >> mposde...@gmail.com> > > > >>>> wrote: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> I have created a proposal for adding some additional aggregation > > APIs > > > >>>> like > > > >>>>> count. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > > > >>>>> > > > >>>>> I have noted down some of the issues that need discussion. Thanks > > to > > > >>>>> Matthias for helping me with the scope of the proposal. > > > >>>>> > > > >>>>> Thanks > > > >>>>> Mohan > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > > >