I was playing with the code a little bit, but it seems not to be easy to use generics to enforce that V is `Comparable`...
We would need to introduce a new interface interface ComparableStream<K, V extends Comparable<V>> extends KStream<K, V> { KTable<K, V> min(); } But it also requires a nasty cast to actually use it: KStream<String, String> stream = new StreamsBuilder().stream(""); KTable<String, String> table = ((ComparableStream<String, String>) stream).min(); If the value-type does not implement `Comparable` the cast would not compile... Or would there be a simpler way to ensure that min() can only be called _if_ V is `Comparable`? So maybe passing in a `Comparator<V>` might be the right way to go; might also be more flexible anyway. -- My original idea was just to maybe avoid the `Comparator` argument, as it would make the API nicer IMHO; fewer parameters is usually better... I am not sure why we would want to pass `Function<V, Comparable<?>> func` into `min()`? -Matthias On 6/21/21 11:23 AM, Mohan Parthasarathy wrote: > Alex, > > > On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <alexandre.bra...@gmail.com> > wrote: > >> Mohan / Mathias, >> >>>> I think extending min/max to non-numeric types makes sense. Wondering >>>> why we should require a `Comparator` or if we should require that the >>>> types implement `Comparable` instead? >>>> >>> Good question. This is what it would look like: >>> >>> KTable<K, V> min_comparable() >>> KTable<K, V> min_comparator(Comparator<V> comp) >> >> Not sure if I understood Mathias' proposal correctly, but I think that >> instead of going with >> your original proposal (<VR extends Number> KTable<K, VR> min(Function<V, >> VR> func...) >> or mine (KTable<K, V> min(Comparator<V> comparator...), we could simplify >> it a >> bit by using a function to extract a Comparable property from the original >> value: >> >> KTable<K, V> min(Function<V, Comparable<?>> func...) >> >> I will let Matthias clarify. I am not sure why it is simpler than the > comparator one. Comparable is implemented by the type and not sure exposing > it this way makes it any better. > >> I also think, that min/max should not change the value type. Using >>> `Long` for sum() make sense though, and also to require a `<V extends >>> Number>`. >> >> Are there any reasons to limit the sum() to integers? Why not use a Double >> instead? >> >> Yeah, if the precision is important, then we should stick with Double. > > -mohan > > Best regards, >> Alexandre >> >> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <mposde...@gmail.com> >> wrote: >> >>> Matthias, >>> >>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax >> <mj...@mailbox.org.invalid >>>> >>> wrote: >>> >>>> Hi, >>>> >>>> I think extending min/max to non-numeric types makes sense. Wondering >>>> why we should require a `Comparator` or if we should require that the >>>> types implement `Comparable` instead? >>>> >>>> Good question. This is what it would look like: >>> >>> KTable<K, V> min_comparable() >>> KTable<K, V> min_comparator(Comparator<V> comp) >>> >>> For min_comparable to work, you still need the bounds "V extends >>> Comparable< >>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, >> it >>> has to be at the interface level like this: >>> >>> KStream<K, V extends Comparable<V>> >>> >>> which is a little messy unless there is a different way to do the same. >> The >>> comparator gives a simple way to define an anonymous function. >>> >>> What do you think ? >>> >>> >>>> I also think, that min/max should not change the value type. Using >>>> `Long` for sum() make sense though, and also to require a `<V extends >>>> Number>`. >>>> >>>> I guess these are the two possibilities: >>> >>> <E extends Number> Long sum(Function<V, E> func) >>> Long sum(Function<V, Number> func) >>> >>> Both should work. "func" can return any subtypes of Number and I don't >> see >>> any advantages with the first version. Can you clarify ? >>> >>> Thanks >>> Mohan >>> >>> >>>> >>>> -Matthias >>>> >>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: >>>>> Hi Alex, >>>>> >>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < >>>> alexandre.bra...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> My point here is that, when we're only interested in a max/min >> numeric >>>>>> value, it doesn't >>>>>> matter when we have repeated values, since we'd be only forwarding >> the >>>>>> number downstream, >>>>>> so I could disregard when the Comparator returns a zero value >> (meaning >>>>>> equals) and min/max >>>>>> would still be semantically correct. But when we're forwarding the >>>> original >>>>>> object downstream >>>>>> instead of its numeric property, it could mean different things >>>>>> semantically depending on how >>>>>> we handle the repeated values. >>>>>> >>>>>> As an example, if I were using max() on a stream of Biddings for >>>> products >>>>>> in an auction, the >>>>>> order of the biddings would probably influence the winner if two >>> clients >>>>>> send Biddings with the >>>>>> same value. If we're only forwarding the Bidding value downstream (a >>>> double >>>>>> value of 100, for >>>>>> example), it doesn't matter how repeated values are handled, since >> the >>>> max >>>>>> price for this >>>>>> auction would still be 100.00, no matter what Bidding got selected >> in >>>> the >>>>>> end. But if we're >>>>>> forwarding the Biddings downstream instead, then it matters whether >>> the >>>>>> winning Bidding sent >>>>>> downstream was originally posted by Client A or Client B. >>>>>> >>>>>> I'm not saying that an overloaded method to handle different options >>> for >>>>>> how repeated values >>>>>> should be handled by min/max is mandatory, but it should be clear on >>> the >>>>>> methods' docs >>>>>> what would happen when Comparator.compare() == 0. My preferred >> option >>>> for >>>>>> the default >>>>>> behaviour is to only forward a new value is smaller/bigger than the >>>>>> previous min/max value >>>>>> (ignoring compare() == 0), since it would emit less values >> downstream >>>> and >>>>>> would be easier >>>>>> to read ("I only send a value downstream if it's bigger/smaller than >>> the >>>>>> previously selected >>>>>> value"). >>>>>> >>>>> Thanks for the clarification. I like your suggestion unless someone >>> feels >>>>> that they want an option to control this (i.e., when compare() == 0, >>>> return >>>>> the old value vs new value). >>>>> >>>>> >>>>>> >>>>>>> Not knowing the schema of the value (V) has its own set of >> problems. >>>> As I >>>>>> have alluded to >>>>>>> in the proposal, this is a little bit messy. We already have >> "reduce" >>>>>> which can be used to >>>>>>> implement sum (mapValues().reduce()). >>>>>>> Thinking about it more, do you think "sum" would be useful ? One >>> hacky >>>>>> way to implement >>>>>>> this is to inspect the type of the return when the "func" is called >>> the >>>>>> first time OR infer from >>>>>>> the materialized or have an explicit initializer. >>>>>> >>>>>> I think it might be useful for some use cases, yes, but it would be >>>> tricky >>>>>> to implement this in a >>>>>> way that handles generic Numbers and keeps their original >>> implementation >>>>>> class. One >>>>>> simplification you could take is fixating VR to be a Double, and >> then >>>> use >>>>>> Number.doubleValue() >>>>>> to compute the sum. >>>>>> >>>>> >>>>> Yeah, that would simplify quite a bit. I think you are suggesting >> this: >>>>> >>>>> KTable<K,Double> sum(Function<V, Number> func) >>>>> >>>>> >>>>>> What you said about using reduce() to compute a sum() is also true >> for >>>>>> min() and max(). =) All >>>>>> three methods in this KIP would be a syntactic sugar for what could >>>>>> otherwise be implemented >>>>>> using reduce/aggregate, but I see value in implementing them and >>>>>> simplifying the adoption of >>>>>> those use cases. >>>>>> >>>>>> Agreed. I seem to have forgotten the reason as to why I started this >>> KIP >>>>> :-). There is a long way to go. >>>>> >>>>> -thanks >>>>> Mohan >>>>> >>>>> Best regards, >>>>>> Alexandre >>>>>> >>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < >>>> mposde...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Alex, >>>>>>> >>>>>>> Responses below. >>>>>>> >>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < >>>>>>> alexandre.bra...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Mohan, >>>>>>>> >>>>>>>> I like the idea of adding those methods to the API, but I'd like >> to >>>>>> make >>>>>>> a >>>>>>>> suggestion: >>>>>>>> >>>>>>>> Although the most used scenario for min() / max() might possibly >> be >>>> for >>>>>>>> numeric values, I think they could also be >>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why >> limit >>>>>> the >>>>>>>> API to Numbers only? >>>>>>>> >>>>>>>> >>>>>>> There was no specific reason. Just addressing the common scenario. >>> But >>>> I >>>>>>> don't see why this can't be supported given your suggestion below. >>>>>>> >>>>>>> Extending on the above, couldn't we change the API to provide a >>>>>>>> Comparator<V> instead of the Function<V, VR> >>>>>>>> for those methods, and make them return a KTable<K, V> instead? >> Not >>>>>> only >>>>>>>> would this approach not limit the >>>>>>>> usage of those methods to Numbers, but they'd also preserve the >>> origin >>>>>>> from >>>>>>>> the min/max value [1]. The extraction of >>>>>>>> a single (numeric?) value could be achieved by a subsequent >>>>>> .mapValues() >>>>>>>> operator, and this strategy would also >>>>>>>> allow us to reuse the stream's current value serde on min / max, >>>> making >>>>>>> the >>>>>>>> Materialized an optional parameter. >>>>>>>> >>>>>>>> I like your idea though it is odd that min/max returns KTable<K, >> V> >>>>>>> instead of the KTable<K, VR> (like in count), but mapValues should >> do >>>> the >>>>>>> trick. >>>>>>> >>>>>>> One extra complication of this approach is that now we'd have to >>> handle >>>>>>>> repeated min/max values from different >>>>>>>> origins (two semantically different objects for which the >> comparator >>>>>>>> returns 0), but we could solve that by adding >>>>>>>> a parameter to specify whether to use the older or newer value (or >>>>>>> assuming >>>>>>>> one of these options as default for a >>>>>>>> simpler API?). >>>>>>>> >>>>>>>> I am not sure whether this complexity is warranted. Why can't we >>> just >>>>>>> stick to the way a regular Comparator works ? Can you give me a >> real >>>>>> world >>>>>>> example ? >>>>>>> >>>>>>>> >>>>>>>> I know it's an implementation issue, but I'm curious on how you'd >>>> solve >>>>>>>> handling the <VR extends Number> on >>>>>>>> the sum(). Since the multiple implementations of this interface >>> don't >>>>>>> have >>>>>>>> a common constructor nor an interface >>>>>>>> method to add two Numbers, would it be possible to implement sum() >>> and >>>>>>>> retain the original VR type on the >>>>>>>> returned KTable? >>>>>>>> >>>>>>> >>>>>>> Not knowing the schema of the value (V) has its own set of >> problems. >>>> As I >>>>>>> have alluded to in the proposal, this is a little bit messy. We >>> already >>>>>>> have "reduce" which can be used to implement sum >>>> (mapValues().reduce()). >>>>>>> Thinking about it more, do you think "sum" would be useful ? One >>> hacky >>>>>> way >>>>>>> to implement this is to inspect the type of the return when the >>> "func" >>>> is >>>>>>> called the first time OR infer from the materialized or have an >>>> explicit >>>>>>> initializer. >>>>>>> >>>>>>> Thanks >>>>>>> Mohan >>>>>>> >>>>>>> >>>>>>>> [1]: An example scenario for this would be to find the min / max >>>>>> Bidding >>>>>>>> for a product where, at the end of the >>>>>>>> auction, I need not only the min / max value of said Bidding, but >>> also >>>>>>> the >>>>>>>> bidder's contact information. >>>>>>>> >>>>>>>> Best, >>>>>>>> Alexandre >>>>>>>> >>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < >>>>>> mposde...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I have created a proposal for adding some additional aggregation >>> APIs >>>>>>>> like >>>>>>>>> count. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs >>>>>>>>> >>>>>>>>> I have noted down some of the issues that need discussion. Thanks >>> to >>>>>>>>> Matthias for helping me with the scope of the proposal. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Mohan >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >