@Mohan: For sum(), I actually think that the return type should be same as the input type. In the end, sum() is a special case of reduce().
@Alexandre: Not sure if using a `Function` to get a `Comparable` is simpler that to implement a `Comparator`? Can you elaborate on this point? In the end, using a `Comparator` seems to provide highest flexibility? -Matthias On 8/15/21 10:27 AM, Alexandre Brasil wrote: >> I am not sure why we would want to pass `Function<V, Comparable<?>> >> func` into `min()`? > > I guess I misread/misunderstood your earlier suggestion. > > My line of thought was that, instead of using a method signature that > demands a Comparator<V> in > min()/max(), we might use a property extractor (like the FK extractors on > some join() overloads) to > return a Comparable property that min()/max() could use to compare the > values. > > The benefit of this approach is that It would be simpler than implementing > comparators when most > use cases would probably compare properties of the values that already > implement Comparable (like > Numbers, Strings, Dates, etc), but on the other hand it would be more > limiting in disallowing the usage > of multiple properties of <V> or on defining how null property values > should be handled. > > On Tue, Aug 3, 2021 at 10:55 PM Matthias J. Sax <mj...@apache.org> wrote: > >> I was playing with the code a little bit, but it seems not to be easy to >> use generics to enforce that V is `Comparable`... >> >> We would need to introduce a new interface >> >> interface ComparableStream<K, V extends Comparable<V>> >> extends KStream<K, V> >> { >> KTable<K, V> min(); >> } >> >> But it also requires a nasty cast to actually use it: >> >> KStream<String, String> stream = >> new StreamsBuilder().stream(""); >> KTable<String, String> table = >> ((ComparableStream<String, String>) stream).min(); >> >> If the value-type does not implement `Comparable` the cast would not >> compile... Or would there be a simpler way to ensure that min() can only >> be called _if_ V is `Comparable`? >> >> >> So maybe passing in a `Comparator<V>` might be the right way to go; >> might also be more flexible anyway. -- My original idea was just to >> maybe avoid the `Comparator` argument, as it would make the API nicer >> IMHO; fewer parameters is usually better... >> >> >> I am not sure why we would want to pass `Function<V, Comparable<?>> >> func` into `min()`? >> >> >> >> -Matthias >> >> >> >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote: >>> Alex, >>> >>> >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil < >> alexandre.bra...@gmail.com> >>> wrote: >>> >>>> Mohan / Mathias, >>>> >>>>>> I think extending min/max to non-numeric types makes sense. Wondering >>>>>> why we should require a `Comparator` or if we should require that the >>>>>> types implement `Comparable` instead? >>>>>> >>>>> Good question. This is what it would look like: >>>>> >>>>> KTable<K, V> min_comparable() >>>>> KTable<K, V> min_comparator(Comparator<V> comp) >>>> >>>> Not sure if I understood Mathias' proposal correctly, but I think that >>>> instead of going with >>>> your original proposal (<VR extends Number> KTable<K, VR> >> min(Function<V, >>>> VR> func...) >>>> or mine (KTable<K, V> min(Comparator<V> comparator...), we could >> simplify >>>> it a >>>> bit by using a function to extract a Comparable property from the >> original >>>> value: >>>> >>>> KTable<K, V> min(Function<V, Comparable<?>> func...) >>>> >>>> I will let Matthias clarify. I am not sure why it is simpler than the >>> comparator one. Comparable is implemented by the type and not sure >> exposing >>> it this way makes it any better. >>> >>>> I also think, that min/max should not change the value type. Using >>>>> `Long` for sum() make sense though, and also to require a `<V extends >>>>> Number>`. >>>> >>>> Are there any reasons to limit the sum() to integers? Why not use a >> Double >>>> instead? >>>> >>>> Yeah, if the precision is important, then we should stick with Double. >>> >>> -mohan >>> >>> Best regards, >>>> Alexandre >>>> >>>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy < >> mposde...@gmail.com> >>>> wrote: >>>> >>>>> Matthias, >>>>> >>>>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax >>>> <mj...@mailbox.org.invalid >>>>>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I think extending min/max to non-numeric types makes sense. Wondering >>>>>> why we should require a `Comparator` or if we should require that the >>>>>> types implement `Comparable` instead? >>>>>> >>>>>> Good question. This is what it would look like: >>>>> >>>>> KTable<K, V> min_comparable() >>>>> KTable<K, V> min_comparator(Comparator<V> comp) >>>>> >>>>> For min_comparable to work, you still need the bounds "V extends >>>>> Comparable< >>>>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, >>>> it >>>>> has to be at the interface level like this: >>>>> >>>>> KStream<K, V extends Comparable<V>> >>>>> >>>>> which is a little messy unless there is a different way to do the same. >>>> The >>>>> comparator gives a simple way to define an anonymous function. >>>>> >>>>> What do you think ? >>>>> >>>>> >>>>>> I also think, that min/max should not change the value type. Using >>>>>> `Long` for sum() make sense though, and also to require a `<V extends >>>>>> Number>`. >>>>>> >>>>>> I guess these are the two possibilities: >>>>> >>>>> <E extends Number> Long sum(Function<V, E> func) >>>>> Long sum(Function<V, Number> func) >>>>> >>>>> Both should work. "func" can return any subtypes of Number and I don't >>>> see >>>>> any advantages with the first version. Can you clarify ? >>>>> >>>>> Thanks >>>>> Mohan >>>>> >>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: >>>>>>> Hi Alex, >>>>>>> >>>>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < >>>>>> alexandre.bra...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> My point here is that, when we're only interested in a max/min >>>> numeric >>>>>>>> value, it doesn't >>>>>>>> matter when we have repeated values, since we'd be only forwarding >>>> the >>>>>>>> number downstream, >>>>>>>> so I could disregard when the Comparator returns a zero value >>>> (meaning >>>>>>>> equals) and min/max >>>>>>>> would still be semantically correct. But when we're forwarding the >>>>>> original >>>>>>>> object downstream >>>>>>>> instead of its numeric property, it could mean different things >>>>>>>> semantically depending on how >>>>>>>> we handle the repeated values. >>>>>>>> >>>>>>>> As an example, if I were using max() on a stream of Biddings for >>>>>> products >>>>>>>> in an auction, the >>>>>>>> order of the biddings would probably influence the winner if two >>>>> clients >>>>>>>> send Biddings with the >>>>>>>> same value. If we're only forwarding the Bidding value downstream (a >>>>>> double >>>>>>>> value of 100, for >>>>>>>> example), it doesn't matter how repeated values are handled, since >>>> the >>>>>> max >>>>>>>> price for this >>>>>>>> auction would still be 100.00, no matter what Bidding got selected >>>> in >>>>>> the >>>>>>>> end. But if we're >>>>>>>> forwarding the Biddings downstream instead, then it matters whether >>>>> the >>>>>>>> winning Bidding sent >>>>>>>> downstream was originally posted by Client A or Client B. >>>>>>>> >>>>>>>> I'm not saying that an overloaded method to handle different options >>>>> for >>>>>>>> how repeated values >>>>>>>> should be handled by min/max is mandatory, but it should be clear on >>>>> the >>>>>>>> methods' docs >>>>>>>> what would happen when Comparator.compare() == 0. My preferred >>>> option >>>>>> for >>>>>>>> the default >>>>>>>> behaviour is to only forward a new value is smaller/bigger than the >>>>>>>> previous min/max value >>>>>>>> (ignoring compare() == 0), since it would emit less values >>>> downstream >>>>>> and >>>>>>>> would be easier >>>>>>>> to read ("I only send a value downstream if it's bigger/smaller than >>>>> the >>>>>>>> previously selected >>>>>>>> value"). >>>>>>>> >>>>>>> Thanks for the clarification. I like your suggestion unless someone >>>>> feels >>>>>>> that they want an option to control this (i.e., when compare() == 0, >>>>>> return >>>>>>> the old value vs new value). >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>> Not knowing the schema of the value (V) has its own set of >>>> problems. >>>>>> As I >>>>>>>> have alluded to >>>>>>>>> in the proposal, this is a little bit messy. We already have >>>> "reduce" >>>>>>>> which can be used to >>>>>>>>> implement sum (mapValues().reduce()). >>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One >>>>> hacky >>>>>>>> way to implement >>>>>>>>> this is to inspect the type of the return when the "func" is called >>>>> the >>>>>>>> first time OR infer from >>>>>>>>> the materialized or have an explicit initializer. >>>>>>>> >>>>>>>> I think it might be useful for some use cases, yes, but it would be >>>>>> tricky >>>>>>>> to implement this in a >>>>>>>> way that handles generic Numbers and keeps their original >>>>> implementation >>>>>>>> class. One >>>>>>>> simplification you could take is fixating VR to be a Double, and >>>> then >>>>>> use >>>>>>>> Number.doubleValue() >>>>>>>> to compute the sum. >>>>>>>> >>>>>>> >>>>>>> Yeah, that would simplify quite a bit. I think you are suggesting >>>> this: >>>>>>> >>>>>>> KTable<K,Double> sum(Function<V, Number> func) >>>>>>> >>>>>>> >>>>>>>> What you said about using reduce() to compute a sum() is also true >>>> for >>>>>>>> min() and max(). =) All >>>>>>>> three methods in this KIP would be a syntactic sugar for what could >>>>>>>> otherwise be implemented >>>>>>>> using reduce/aggregate, but I see value in implementing them and >>>>>>>> simplifying the adoption of >>>>>>>> those use cases. >>>>>>>> >>>>>>>> Agreed. I seem to have forgotten the reason as to why I started this >>>>> KIP >>>>>>> :-). There is a long way to go. >>>>>>> >>>>>>> -thanks >>>>>>> Mohan >>>>>>> >>>>>>> Best regards, >>>>>>>> Alexandre >>>>>>>> >>>>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < >>>>>> mposde...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Alex, >>>>>>>>> >>>>>>>>> Responses below. >>>>>>>>> >>>>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < >>>>>>>>> alexandre.bra...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Mohan, >>>>>>>>>> >>>>>>>>>> I like the idea of adding those methods to the API, but I'd like >>>> to >>>>>>>> make >>>>>>>>> a >>>>>>>>>> suggestion: >>>>>>>>>> >>>>>>>>>> Although the most used scenario for min() / max() might possibly >>>> be >>>>>> for >>>>>>>>>> numeric values, I think they could also be >>>>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why >>>> limit >>>>>>>> the >>>>>>>>>> API to Numbers only? >>>>>>>>>> >>>>>>>>>> >>>>>>>>> There was no specific reason. Just addressing the common scenario. >>>>> But >>>>>> I >>>>>>>>> don't see why this can't be supported given your suggestion below. >>>>>>>>> >>>>>>>>> Extending on the above, couldn't we change the API to provide a >>>>>>>>>> Comparator<V> instead of the Function<V, VR> >>>>>>>>>> for those methods, and make them return a KTable<K, V> instead? >>>> Not >>>>>>>> only >>>>>>>>>> would this approach not limit the >>>>>>>>>> usage of those methods to Numbers, but they'd also preserve the >>>>> origin >>>>>>>>> from >>>>>>>>>> the min/max value [1]. The extraction of >>>>>>>>>> a single (numeric?) value could be achieved by a subsequent >>>>>>>> .mapValues() >>>>>>>>>> operator, and this strategy would also >>>>>>>>>> allow us to reuse the stream's current value serde on min / max, >>>>>> making >>>>>>>>> the >>>>>>>>>> Materialized an optional parameter. >>>>>>>>>> >>>>>>>>>> I like your idea though it is odd that min/max returns KTable<K, >>>> V> >>>>>>>>> instead of the KTable<K, VR> (like in count), but mapValues should >>>> do >>>>>> the >>>>>>>>> trick. >>>>>>>>> >>>>>>>>> One extra complication of this approach is that now we'd have to >>>>> handle >>>>>>>>>> repeated min/max values from different >>>>>>>>>> origins (two semantically different objects for which the >>>> comparator >>>>>>>>>> returns 0), but we could solve that by adding >>>>>>>>>> a parameter to specify whether to use the older or newer value (or >>>>>>>>> assuming >>>>>>>>>> one of these options as default for a >>>>>>>>>> simpler API?). >>>>>>>>>> >>>>>>>>>> I am not sure whether this complexity is warranted. Why can't we >>>>> just >>>>>>>>> stick to the way a regular Comparator works ? Can you give me a >>>> real >>>>>>>> world >>>>>>>>> example ? >>>>>>>>> >>>>>>>>>> >>>>>>>>>> I know it's an implementation issue, but I'm curious on how you'd >>>>>> solve >>>>>>>>>> handling the <VR extends Number> on >>>>>>>>>> the sum(). Since the multiple implementations of this interface >>>>> don't >>>>>>>>> have >>>>>>>>>> a common constructor nor an interface >>>>>>>>>> method to add two Numbers, would it be possible to implement sum() >>>>> and >>>>>>>>>> retain the original VR type on the >>>>>>>>>> returned KTable? >>>>>>>>>> >>>>>>>>> >>>>>>>>> Not knowing the schema of the value (V) has its own set of >>>> problems. >>>>>> As I >>>>>>>>> have alluded to in the proposal, this is a little bit messy. We >>>>> already >>>>>>>>> have "reduce" which can be used to implement sum >>>>>> (mapValues().reduce()). >>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One >>>>> hacky >>>>>>>> way >>>>>>>>> to implement this is to inspect the type of the return when the >>>>> "func" >>>>>> is >>>>>>>>> called the first time OR infer from the materialized or have an >>>>>> explicit >>>>>>>>> initializer. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Mohan >>>>>>>>> >>>>>>>>> >>>>>>>>>> [1]: An example scenario for this would be to find the min / max >>>>>>>> Bidding >>>>>>>>>> for a product where, at the end of the >>>>>>>>>> auction, I need not only the min / max value of said Bidding, but >>>>> also >>>>>>>>> the >>>>>>>>>> bidder's contact information. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Alexandre >>>>>>>>>> >>>>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < >>>>>>>> mposde...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I have created a proposal for adding some additional aggregation >>>>> APIs >>>>>>>>>> like >>>>>>>>>>> count. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs >>>>>>>>>>> >>>>>>>>>>> I have noted down some of the issues that need discussion. Thanks >>>>> to >>>>>>>>>>> Matthias for helping me with the scope of the proposal. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Mohan >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >