@Mohan:

For sum(), I actually think that the return type should be same as the
input type. In the end, sum() is a special case of reduce().

@Alexandre:

Not sure if using a `Function` to get a `Comparable` is simpler that to
implement a `Comparator`? Can you elaborate on this point? In the end,
using a `Comparator` seems to provide highest flexibility?


-Matthias

On 8/15/21 10:27 AM, Alexandre Brasil wrote:
>> I am not sure why we would want to pass `Function<V, Comparable<?>>
>> func` into `min()`?
> 
> I guess I misread/misunderstood your earlier suggestion.
> 
> My line of thought was that, instead of using a method signature that
> demands a Comparator<V> in
> min()/max(), we might use a property extractor (like the FK extractors on
> some join() overloads) to
> return a Comparable property that min()/max() could use to compare the
> values.
> 
> The benefit of this approach is that It would be simpler than implementing
> comparators when most
> use cases would probably compare properties of the values that already
> implement Comparable (like
> Numbers, Strings, Dates, etc), but on the other hand it would be more
> limiting in disallowing the usage
> of multiple properties of <V> or on defining how null property values
> should be handled.
> 
> On Tue, Aug 3, 2021 at 10:55 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> I was playing with the code a little bit, but it seems not to be easy to
>> use generics to enforce that V is `Comparable`...
>>
>> We would need to introduce a new interface
>>
>>  interface ComparableStream<K, V extends Comparable<V>>
>>     extends KStream<K, V>
>>  {
>>     KTable<K, V> min();
>>  }
>>
>> But it also requires a nasty cast to actually use it:
>>
>>   KStream<String, String> stream =
>>     new StreamsBuilder().stream("");
>>   KTable<String, String> table =
>>     ((ComparableStream<String, String>) stream).min();
>>
>> If the value-type does not implement `Comparable` the cast would not
>> compile... Or would there be a simpler way to ensure that min() can only
>> be called _if_ V is `Comparable`?
>>
>>
>> So maybe passing in a `Comparator<V>` might be the right way to go;
>> might also be more flexible anyway. -- My original idea was just to
>> maybe avoid the `Comparator` argument, as it would make the API nicer
>> IMHO; fewer parameters is usually better...
>>
>>
>> I am not sure why we would want to pass `Function<V, Comparable<?>>
>> func` into `min()`?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
>>> Alex,
>>>
>>>
>>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
>> alexandre.bra...@gmail.com>
>>> wrote:
>>>
>>>> Mohan / Mathias,
>>>>
>>>>>> I think extending min/max to non-numeric types makes sense. Wondering
>>>>>> why we should require a `Comparator` or if we should require that the
>>>>>> types implement `Comparable` instead?
>>>>>>
>>>>> Good question. This is what it would look like:
>>>>>
>>>>> KTable<K, V> min_comparable()
>>>>> KTable<K, V> min_comparator(Comparator<V> comp)
>>>>
>>>> Not sure if I understood Mathias' proposal correctly, but I think that
>>>> instead of going with
>>>> your original proposal (<VR extends Number> KTable<K, VR>
>> min(Function<V,
>>>> VR> func...)
>>>> or mine (KTable<K, V> min(Comparator<V> comparator...), we could
>> simplify
>>>> it a
>>>> bit by using a function to extract a Comparable property from the
>> original
>>>> value:
>>>>
>>>> KTable<K, V> min(Function<V, Comparable<?>> func...)
>>>>
>>>> I will let Matthias clarify. I am not sure why it is simpler than the
>>> comparator one. Comparable is implemented by the type and not sure
>> exposing
>>> it this way makes it any better.
>>>
>>>> I also think, that min/max should not change the value type. Using
>>>>> `Long` for sum() make sense though, and also to require a `<V extends
>>>>> Number>`.
>>>>
>>>> Are there any reasons to limit the sum() to integers? Why not use a
>> Double
>>>> instead?
>>>>
>>>> Yeah, if the precision is important, then we should stick with Double.
>>>
>>> -mohan
>>>
>>> Best regards,
>>>> Alexandre
>>>>
>>>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
>> mposde...@gmail.com>
>>>> wrote:
>>>>
>>>>> Matthias,
>>>>>
>>>>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
>>>> <mj...@mailbox.org.invalid
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think extending min/max to non-numeric types makes sense. Wondering
>>>>>> why we should require a `Comparator` or if we should require that the
>>>>>> types implement `Comparable` instead?
>>>>>>
>>>>>> Good question. This is what it would look like:
>>>>>
>>>>> KTable<K, V> min_comparable()
>>>>> KTable<K, V> min_comparator(Comparator<V> comp)
>>>>>
>>>>> For min_comparable to work, you still need the bounds "V extends
>>>>> Comparable<
>>>>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
>>>> it
>>>>> has to be at the interface level like this:
>>>>>
>>>>>  KStream<K,  V extends Comparable<V>>
>>>>>
>>>>> which is a little messy unless there is a different way to do the same.
>>>> The
>>>>> comparator gives a simple way to define an anonymous function.
>>>>>
>>>>> What do you think ?
>>>>>
>>>>>
>>>>>> I also think, that min/max should not change the value type. Using
>>>>>> `Long` for sum() make sense though, and also to require a `<V extends
>>>>>> Number>`.
>>>>>>
>>>>>> I guess these are the two possibilities:
>>>>>
>>>>> <E extends Number> Long sum(Function<V, E> func)
>>>>> Long sum(Function<V, Number> func)
>>>>>
>>>>> Both should work. "func" can return any subtypes of Number and I don't
>>>> see
>>>>> any advantages with the first version. Can you clarify ?
>>>>>
>>>>> Thanks
>>>>> Mohan
>>>>>
>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
>>>>>>> Hi Alex,
>>>>>>>
>>>>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
>>>>>> alexandre.bra...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> My point here is that, when we're only interested in a max/min
>>>> numeric
>>>>>>>> value, it doesn't
>>>>>>>> matter when we have repeated values, since we'd be only forwarding
>>>> the
>>>>>>>> number downstream,
>>>>>>>> so I could disregard when the Comparator returns a zero value
>>>> (meaning
>>>>>>>> equals) and min/max
>>>>>>>> would still be semantically correct. But when we're forwarding the
>>>>>> original
>>>>>>>> object downstream
>>>>>>>> instead of its numeric property, it could mean different things
>>>>>>>> semantically depending on how
>>>>>>>> we handle the repeated values.
>>>>>>>>
>>>>>>>> As an example, if I were using max() on a stream of Biddings for
>>>>>> products
>>>>>>>> in an auction, the
>>>>>>>> order of the biddings would probably influence the winner if two
>>>>> clients
>>>>>>>> send Biddings with the
>>>>>>>> same value. If we're only forwarding the Bidding value downstream (a
>>>>>> double
>>>>>>>> value of 100, for
>>>>>>>> example), it doesn't matter how repeated values are handled, since
>>>> the
>>>>>> max
>>>>>>>> price for this
>>>>>>>> auction would still be 100.00, no matter what Bidding got selected
>>>> in
>>>>>> the
>>>>>>>> end. But if we're
>>>>>>>> forwarding the Biddings downstream instead, then it matters whether
>>>>> the
>>>>>>>> winning Bidding sent
>>>>>>>> downstream was originally posted by Client A or Client B.
>>>>>>>>
>>>>>>>> I'm not saying that an overloaded method to handle different options
>>>>> for
>>>>>>>> how repeated values
>>>>>>>> should be handled by min/max is mandatory, but it should be clear on
>>>>> the
>>>>>>>> methods' docs
>>>>>>>> what would happen when Comparator.compare() == 0. My preferred
>>>> option
>>>>>> for
>>>>>>>> the default
>>>>>>>> behaviour is to only forward a new value is smaller/bigger than the
>>>>>>>> previous min/max value
>>>>>>>> (ignoring compare() == 0), since it would emit less values
>>>> downstream
>>>>>> and
>>>>>>>> would be easier
>>>>>>>> to read ("I only send a value downstream if it's bigger/smaller than
>>>>> the
>>>>>>>> previously selected
>>>>>>>> value").
>>>>>>>>
>>>>>>> Thanks for the clarification. I like your suggestion unless someone
>>>>> feels
>>>>>>> that they want an option to control this (i.e., when compare() == 0,
>>>>>> return
>>>>>>> the old value vs new value).
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>> Not knowing the schema of the value (V) has its own set of
>>>> problems.
>>>>>> As I
>>>>>>>> have alluded to
>>>>>>>>> in the proposal, this is a little bit messy. We already have
>>>> "reduce"
>>>>>>>> which can be used to
>>>>>>>>> implement sum (mapValues().reduce()).
>>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One
>>>>> hacky
>>>>>>>> way to implement
>>>>>>>>> this is to inspect the type of the return when the "func" is called
>>>>> the
>>>>>>>> first time OR infer from
>>>>>>>>> the materialized or have an explicit initializer.
>>>>>>>>
>>>>>>>> I think it might be useful for some use cases, yes, but it would be
>>>>>> tricky
>>>>>>>> to implement this in a
>>>>>>>> way that handles generic Numbers and keeps their original
>>>>> implementation
>>>>>>>> class. One
>>>>>>>> simplification you could take is fixating VR to be a Double, and
>>>> then
>>>>>> use
>>>>>>>> Number.doubleValue()
>>>>>>>> to compute the sum.
>>>>>>>>
>>>>>>>
>>>>>>> Yeah, that would simplify quite a bit. I think you are suggesting
>>>> this:
>>>>>>>
>>>>>>> KTable<K,Double> sum(Function<V, Number> func)
>>>>>>>
>>>>>>>
>>>>>>>> What you said about using reduce() to compute a sum() is also true
>>>> for
>>>>>>>> min() and max(). =) All
>>>>>>>> three methods in this KIP would be a syntactic sugar for what could
>>>>>>>> otherwise be implemented
>>>>>>>> using reduce/aggregate, but I see value in implementing them and
>>>>>>>> simplifying the adoption of
>>>>>>>> those use cases.
>>>>>>>>
>>>>>>>> Agreed. I seem to have forgotten the reason as to why I started this
>>>>> KIP
>>>>>>> :-). There is a long way to go.
>>>>>>>
>>>>>>> -thanks
>>>>>>> Mohan
>>>>>>>
>>>>>>> Best regards,
>>>>>>>> Alexandre
>>>>>>>>
>>>>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
>>>>>> mposde...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Alex,
>>>>>>>>>
>>>>>>>>> Responses below.
>>>>>>>>>
>>>>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
>>>>>>>>> alexandre.bra...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Mohan,
>>>>>>>>>>
>>>>>>>>>> I like the idea of adding those methods to the API, but I'd like
>>>> to
>>>>>>>> make
>>>>>>>>> a
>>>>>>>>>> suggestion:
>>>>>>>>>>
>>>>>>>>>> Although the most used scenario for min() / max() might possibly
>>>> be
>>>>>> for
>>>>>>>>>> numeric values, I think they could also be
>>>>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why
>>>> limit
>>>>>>>> the
>>>>>>>>>> API to Numbers only?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> There was no specific reason. Just addressing the common scenario.
>>>>> But
>>>>>> I
>>>>>>>>> don't see why this can't be supported given your suggestion below.
>>>>>>>>>
>>>>>>>>> Extending on the above, couldn't we change the API to provide a
>>>>>>>>>> Comparator<V> instead of the Function<V, VR>
>>>>>>>>>> for those methods, and make them return a KTable<K, V> instead?
>>>> Not
>>>>>>>> only
>>>>>>>>>> would this approach not limit the
>>>>>>>>>> usage of those methods to Numbers, but they'd also preserve the
>>>>> origin
>>>>>>>>> from
>>>>>>>>>> the min/max value [1]. The extraction of
>>>>>>>>>> a single (numeric?) value could be achieved by a subsequent
>>>>>>>> .mapValues()
>>>>>>>>>> operator, and this strategy would also
>>>>>>>>>> allow us to reuse the stream's current value serde on min / max,
>>>>>> making
>>>>>>>>> the
>>>>>>>>>> Materialized an optional parameter.
>>>>>>>>>>
>>>>>>>>>> I like your idea though it is odd that min/max returns  KTable<K,
>>>> V>
>>>>>>>>> instead of the KTable<K, VR> (like in count), but mapValues should
>>>> do
>>>>>> the
>>>>>>>>> trick.
>>>>>>>>>
>>>>>>>>> One extra complication of this approach is that now we'd have to
>>>>> handle
>>>>>>>>>> repeated min/max values from different
>>>>>>>>>> origins (two semantically different objects for which the
>>>> comparator
>>>>>>>>>> returns 0), but we could solve that by adding
>>>>>>>>>> a parameter to specify whether to use the older or newer value (or
>>>>>>>>> assuming
>>>>>>>>>> one of these options as default for a
>>>>>>>>>> simpler API?).
>>>>>>>>>>
>>>>>>>>>> I am not sure whether this complexity is warranted. Why can't we
>>>>> just
>>>>>>>>> stick to the way a regular Comparator works ? Can you give me a
>>>> real
>>>>>>>> world
>>>>>>>>> example ?
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I know it's an implementation issue, but I'm curious on how you'd
>>>>>> solve
>>>>>>>>>> handling the <VR extends Number> on
>>>>>>>>>> the sum(). Since the multiple implementations of this interface
>>>>> don't
>>>>>>>>> have
>>>>>>>>>> a common constructor nor an interface
>>>>>>>>>> method to add two Numbers, would it be possible to implement sum()
>>>>> and
>>>>>>>>>> retain the original VR type on the
>>>>>>>>>> returned KTable?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Not knowing the schema of the value (V) has its own set of
>>>> problems.
>>>>>> As I
>>>>>>>>> have alluded to in the proposal, this is a little bit messy. We
>>>>> already
>>>>>>>>> have "reduce" which can be used to implement sum
>>>>>> (mapValues().reduce()).
>>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One
>>>>> hacky
>>>>>>>> way
>>>>>>>>> to implement this is to inspect the type of the return when the
>>>>> "func"
>>>>>> is
>>>>>>>>> called the first time OR infer from the materialized or have an
>>>>>> explicit
>>>>>>>>> initializer.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Mohan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> [1]: An example scenario for this would be to find the min / max
>>>>>>>> Bidding
>>>>>>>>>> for a product where, at the end of the
>>>>>>>>>> auction, I need not only the min / max value of said Bidding, but
>>>>> also
>>>>>>>>> the
>>>>>>>>>> bidder's contact information.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Alexandre
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
>>>>>>>> mposde...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I have created a proposal for adding some additional aggregation
>>>>> APIs
>>>>>>>>>> like
>>>>>>>>>>> count.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
>>>>>>>>>>>
>>>>>>>>>>> I have noted down some of the issues that need discussion. Thanks
>>>>> to
>>>>>>>>>>> Matthias for helping me with the scope of the proposal.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Mohan
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Reply via email to